From b32cc6a088d2bf249c1fbc4c51f106801829cd6e Mon Sep 17 00:00:00 2001 From: Eric Seppanen Date: Sun, 18 Apr 2021 19:15:53 -0700 Subject: [PATCH] pageserver: change over some error handling to anyhow+thiserror This is a first attempt at a new error-handling strategy: - Use anyhow::Error as the first choice for easy error handling - Use thiserror to generate local error types for anything that needs it (no error type is available to us) or will be inspected or matched on by higher layers. --- Cargo.lock | 1 + pageserver/Cargo.toml | 1 + pageserver/src/page_cache.rs | 38 ++++++++--------- pageserver/src/walreceiver.rs | 77 ++++++++++++++++------------------- 4 files changed, 53 insertions(+), 64 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 74b32d20e3..85c299c7a8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1215,6 +1215,7 @@ dependencies = [ "slog-stdlog", "slog-term", "termion", + "thiserror", "tokio", "tokio-postgres", "tokio-stream", diff --git a/pageserver/Cargo.toml b/pageserver/Cargo.toml index 7f0b31e20c..f7f3be7f47 100644 --- a/pageserver/Cargo.toml +++ b/pageserver/Cargo.toml @@ -36,3 +36,4 @@ postgres = { git = "https://github.com/zenithdb/rust-postgres.git", rev="a0d067b anyhow = "1.0" crc32c = "0.6.0" walkdir = "2" +thiserror = "1.0" diff --git a/pageserver/src/page_cache.rs b/pageserver/src/page_cache.rs index e820b218e3..1c05ea7e8f 100644 --- a/pageserver/src/page_cache.rs +++ b/pageserver/src/page_cache.rs @@ -6,25 +6,22 @@ // per-entry mutex. // +use crate::{walredo, PageServerConf}; +use anyhow::bail; +use bytes::Bytes; use core::ops::Bound::Included; +use crossbeam_channel::unbounded; +use crossbeam_channel::{Receiver, Sender}; +use lazy_static::lazy_static; +use log::*; +use rand::Rng; use std::collections::{BTreeMap, HashMap}; -use std::error::Error; use std::sync::atomic::AtomicU64; use std::sync::atomic::Ordering; use std::sync::{Arc, Condvar, Mutex}; use std::thread; use std::time::Duration; use std::{convert::TryInto, ops::AddAssign}; -// use tokio::sync::RwLock; -use bytes::Bytes; -use lazy_static::lazy_static; -use log::*; -use rand::Rng; - -use crate::{walredo, PageServerConf}; - -use crossbeam_channel::unbounded; -use crossbeam_channel::{Receiver, Sender}; // Timeout when waiting or WAL receiver to catch up to an LSN given in a GetPage@LSN call. static TIMEOUT: Duration = Duration::from_secs(60); @@ -248,7 +245,7 @@ impl PageCache { // // Returns an 8k page image // - pub fn get_page_at_lsn(&self, tag: BufferTag, lsn: u64) -> Result> { + pub fn get_page_at_lsn(&self, tag: BufferTag, lsn: u64) -> anyhow::Result { self.num_getpage_requests.fetch_add(1, Ordering::Relaxed); // Look up cache entry. If it's a page image, return that. If it's a WAL record, @@ -276,11 +273,11 @@ impl PageCache { shared = wait_result.0; if wait_result.1.timed_out() { - return Err(format!( + bail!( "Timed out while waiting for WAL record at LSN {:X}/{:X} to arrive", lsn >> 32, lsn & 0xffff_ffff - ))?; + ); } } if waited { @@ -288,11 +285,11 @@ impl PageCache { } if lsn < shared.first_valid_lsn { - return Err(format!( + bail!( "LSN {:X}/{:X} has already been removed", lsn >> 32, lsn & 0xffff_ffff - ))?; + ); } let pagecache = &shared.pagecache; @@ -347,12 +344,12 @@ impl PageCache { error!( "could not apply WAL to reconstruct page image for GetPage@LSN request" ); - return Err("could not apply WAL to reconstruct page image".into()); + bail!("could not apply WAL to reconstruct page image"); } }; } else { // No base image, and no WAL record. Huh? - return Err(format!("no page image or WAL record for requested page"))?; + bail!("no page image or WAL record for requested page"); } } @@ -432,10 +429,7 @@ impl PageCache { // Adds a WAL record to the page cache // pub fn put_wal_record(&self, tag: BufferTag, rec: WALRecord) { - let key = CacheKey { - tag, - lsn: rec.lsn, - }; + let key = CacheKey { tag, lsn: rec.lsn }; let entry = CacheEntry::new(key.clone()); entry.content.lock().unwrap().wal_record = Some(rec); diff --git a/pageserver/src/walreceiver.rs b/pageserver/src/walreceiver.rs index 56e27150e5..8c869c7346 100644 --- a/pageserver/src/walreceiver.rs +++ b/pageserver/src/walreceiver.rs @@ -1,28 +1,25 @@ -use std::str::FromStr; - -// -// WAL receiver -// -// The WAL receiver connects to the WAL safekeeper service, and streams WAL. -// For each WAL record, it decodes the record to figure out which data blocks -// the record affects, and adds the records to the page cache. -// -use log::*; - -use tokio::runtime; -use tokio::time::{sleep, Duration}; -use tokio_stream::StreamExt; +//! +//! WAL receiver +//! +//! The WAL receiver connects to the WAL safekeeper service, and streams WAL. +//! For each WAL record, it decodes the record to figure out which data blocks +//! the record affects, and adds the records to the page cache. +//! use crate::page_cache; use crate::page_cache::BufferTag; use crate::waldecoder::{decode_wal_record, WalStreamDecoder}; use crate::PageServerConf; - +use anyhow::Error; +use log::*; use postgres_protocol::message::backend::ReplicationMessage; use postgres_types::PgLsn; +use std::str::FromStr; +use tokio::runtime; +use tokio::time::{sleep, Duration}; use tokio_postgres::replication::{PgTimestamp, ReplicationStream}; - -use tokio_postgres::{Error, NoTls, SimpleQueryMessage, SimpleQueryRow}; +use tokio_postgres::{NoTls, SimpleQueryMessage, SimpleQueryRow}; +use tokio_stream::StreamExt; // // This is the entry point for the WAL receiver thread. @@ -49,10 +46,7 @@ pub fn thread_main(conf: &PageServerConf, wal_producer_connstr: &str) { }); } -async fn walreceiver_main( - conf: &PageServerConf, - wal_producer_connstr: &str, -) -> Result<(), Error> { +async fn walreceiver_main(conf: &PageServerConf, wal_producer_connstr: &str) -> Result<(), Error> { // Connect to the database in replication mode. info!("connecting to {:?}", wal_producer_connstr); let connect_cfg = format!("{} replication=true", wal_producer_connstr); @@ -109,10 +103,7 @@ async fn walreceiver_main( let startpoint = PgLsn::from(startpoint); let query = format!("START_REPLICATION PHYSICAL {}", startpoint); - let copy_stream = rclient - .copy_both_simple::(&query) - .await - .unwrap(); + let copy_stream = rclient.copy_both_simple::(&query).await?; let physical_stream = ReplicationStream::new(copy_stream); tokio::pin!(physical_stream); @@ -207,14 +198,13 @@ async fn walreceiver_main( let write_lsn = last_lsn; let flush_lsn = last_lsn; let apply_lsn = PgLsn::INVALID; - let ts = PgTimestamp::now().unwrap(); + let ts = PgTimestamp::now()?; const NO_REPLY: u8 = 0u8; physical_stream .as_mut() .standby_status_update(write_lsn, flush_lsn, apply_lsn, ts, NO_REPLY) - .await - .unwrap(); + .await?; } } _ => (), @@ -236,33 +226,36 @@ pub struct IdentifySystem { dbname: Option, } +/// There was a problem parsing the response to +/// a postgres IDENTIFY_SYSTEM command. +#[derive(Debug, thiserror::Error)] +#[error("IDENTIFY_SYSTEM parse error")] +pub struct IdentifyError; + /// Run the postgres `IDENTIFY_SYSTEM` command pub async fn identify_system(client: &tokio_postgres::Client) -> Result { let query_str = "IDENTIFY_SYSTEM"; let response = client.simple_query(query_str).await?; // get(N) from row, then parse it as some destination type. - fn get_parse(row: &SimpleQueryRow, idx: usize) -> Option + fn get_parse(row: &SimpleQueryRow, idx: usize) -> Result where T: FromStr, { - let val = row.get(idx)?; - val.parse::().ok() + let val = row.get(idx).ok_or(IdentifyError)?; + val.parse::().or(Err(IdentifyError)) } - // FIXME: turn unwrap() into errors. - // All of the tokio_postgres::Error builders are private, so we - // can't create them here. We'll just have to create our own error type. - - if let SimpleQueryMessage::Row(first_row) = response.get(0).unwrap() { + // extract the row contents into an IdentifySystem struct. + // written as a closure so I can use ? for Option here. + if let Some(SimpleQueryMessage::Row(first_row)) = response.get(0) { Ok(IdentifySystem { - systemid: get_parse(first_row, 0).unwrap(), - timeline: get_parse(first_row, 1).unwrap(), - xlogpos: get_parse(first_row, 2).unwrap(), - dbname: get_parse(first_row, 3), + systemid: get_parse(first_row, 0)?, + timeline: get_parse(first_row, 1)?, + xlogpos: get_parse(first_row, 2)?, + dbname: get_parse(first_row, 3).ok(), }) } else { - // FIXME: return an error - panic!("identify_system returned non-row response"); + Err(IdentifyError)? } }