pageserver: change over some error handling to anyhow+thiserror

This is a first attempt at a new error-handling strategy:
- Use anyhow::Error as the first choice for easy error handling
- Use thiserror to generate local error types for anything that
  needs it (no error type is available to us) or will be inspected
  or matched on by higher layers.
This commit is contained in:
Eric Seppanen
2021-04-18 19:15:53 -07:00
parent 3c7f810849
commit b32cc6a088
4 changed files with 53 additions and 64 deletions

1
Cargo.lock generated
View File

@@ -1215,6 +1215,7 @@ dependencies = [
"slog-stdlog",
"slog-term",
"termion",
"thiserror",
"tokio",
"tokio-postgres",
"tokio-stream",

View File

@@ -36,3 +36,4 @@ postgres = { git = "https://github.com/zenithdb/rust-postgres.git", rev="a0d067b
anyhow = "1.0"
crc32c = "0.6.0"
walkdir = "2"
thiserror = "1.0"

View File

@@ -6,25 +6,22 @@
// per-entry mutex.
//
use crate::{walredo, PageServerConf};
use anyhow::bail;
use bytes::Bytes;
use core::ops::Bound::Included;
use crossbeam_channel::unbounded;
use crossbeam_channel::{Receiver, Sender};
use lazy_static::lazy_static;
use log::*;
use rand::Rng;
use std::collections::{BTreeMap, HashMap};
use std::error::Error;
use std::sync::atomic::AtomicU64;
use std::sync::atomic::Ordering;
use std::sync::{Arc, Condvar, Mutex};
use std::thread;
use std::time::Duration;
use std::{convert::TryInto, ops::AddAssign};
// use tokio::sync::RwLock;
use bytes::Bytes;
use lazy_static::lazy_static;
use log::*;
use rand::Rng;
use crate::{walredo, PageServerConf};
use crossbeam_channel::unbounded;
use crossbeam_channel::{Receiver, Sender};
// Timeout when waiting or WAL receiver to catch up to an LSN given in a GetPage@LSN call.
static TIMEOUT: Duration = Duration::from_secs(60);
@@ -248,7 +245,7 @@ impl PageCache {
//
// Returns an 8k page image
//
pub fn get_page_at_lsn(&self, tag: BufferTag, lsn: u64) -> Result<Bytes, Box<dyn Error>> {
pub fn get_page_at_lsn(&self, tag: BufferTag, lsn: u64) -> anyhow::Result<Bytes> {
self.num_getpage_requests.fetch_add(1, Ordering::Relaxed);
// Look up cache entry. If it's a page image, return that. If it's a WAL record,
@@ -276,11 +273,11 @@ impl PageCache {
shared = wait_result.0;
if wait_result.1.timed_out() {
return Err(format!(
bail!(
"Timed out while waiting for WAL record at LSN {:X}/{:X} to arrive",
lsn >> 32,
lsn & 0xffff_ffff
))?;
);
}
}
if waited {
@@ -288,11 +285,11 @@ impl PageCache {
}
if lsn < shared.first_valid_lsn {
return Err(format!(
bail!(
"LSN {:X}/{:X} has already been removed",
lsn >> 32,
lsn & 0xffff_ffff
))?;
);
}
let pagecache = &shared.pagecache;
@@ -347,12 +344,12 @@ impl PageCache {
error!(
"could not apply WAL to reconstruct page image for GetPage@LSN request"
);
return Err("could not apply WAL to reconstruct page image".into());
bail!("could not apply WAL to reconstruct page image");
}
};
} else {
// No base image, and no WAL record. Huh?
return Err(format!("no page image or WAL record for requested page"))?;
bail!("no page image or WAL record for requested page");
}
}
@@ -432,10 +429,7 @@ impl PageCache {
// Adds a WAL record to the page cache
//
pub fn put_wal_record(&self, tag: BufferTag, rec: WALRecord) {
let key = CacheKey {
tag,
lsn: rec.lsn,
};
let key = CacheKey { tag, lsn: rec.lsn };
let entry = CacheEntry::new(key.clone());
entry.content.lock().unwrap().wal_record = Some(rec);

View File

@@ -1,28 +1,25 @@
use std::str::FromStr;
//
// WAL receiver
//
// The WAL receiver connects to the WAL safekeeper service, and streams WAL.
// For each WAL record, it decodes the record to figure out which data blocks
// the record affects, and adds the records to the page cache.
//
use log::*;
use tokio::runtime;
use tokio::time::{sleep, Duration};
use tokio_stream::StreamExt;
//!
//! WAL receiver
//!
//! The WAL receiver connects to the WAL safekeeper service, and streams WAL.
//! For each WAL record, it decodes the record to figure out which data blocks
//! the record affects, and adds the records to the page cache.
//!
use crate::page_cache;
use crate::page_cache::BufferTag;
use crate::waldecoder::{decode_wal_record, WalStreamDecoder};
use crate::PageServerConf;
use anyhow::Error;
use log::*;
use postgres_protocol::message::backend::ReplicationMessage;
use postgres_types::PgLsn;
use std::str::FromStr;
use tokio::runtime;
use tokio::time::{sleep, Duration};
use tokio_postgres::replication::{PgTimestamp, ReplicationStream};
use tokio_postgres::{Error, NoTls, SimpleQueryMessage, SimpleQueryRow};
use tokio_postgres::{NoTls, SimpleQueryMessage, SimpleQueryRow};
use tokio_stream::StreamExt;
//
// This is the entry point for the WAL receiver thread.
@@ -49,10 +46,7 @@ pub fn thread_main(conf: &PageServerConf, wal_producer_connstr: &str) {
});
}
async fn walreceiver_main(
conf: &PageServerConf,
wal_producer_connstr: &str,
) -> Result<(), Error> {
async fn walreceiver_main(conf: &PageServerConf, wal_producer_connstr: &str) -> Result<(), Error> {
// Connect to the database in replication mode.
info!("connecting to {:?}", wal_producer_connstr);
let connect_cfg = format!("{} replication=true", wal_producer_connstr);
@@ -109,10 +103,7 @@ async fn walreceiver_main(
let startpoint = PgLsn::from(startpoint);
let query = format!("START_REPLICATION PHYSICAL {}", startpoint);
let copy_stream = rclient
.copy_both_simple::<bytes::Bytes>(&query)
.await
.unwrap();
let copy_stream = rclient.copy_both_simple::<bytes::Bytes>(&query).await?;
let physical_stream = ReplicationStream::new(copy_stream);
tokio::pin!(physical_stream);
@@ -207,14 +198,13 @@ async fn walreceiver_main(
let write_lsn = last_lsn;
let flush_lsn = last_lsn;
let apply_lsn = PgLsn::INVALID;
let ts = PgTimestamp::now().unwrap();
let ts = PgTimestamp::now()?;
const NO_REPLY: u8 = 0u8;
physical_stream
.as_mut()
.standby_status_update(write_lsn, flush_lsn, apply_lsn, ts, NO_REPLY)
.await
.unwrap();
.await?;
}
}
_ => (),
@@ -236,33 +226,36 @@ pub struct IdentifySystem {
dbname: Option<String>,
}
/// There was a problem parsing the response to
/// a postgres IDENTIFY_SYSTEM command.
#[derive(Debug, thiserror::Error)]
#[error("IDENTIFY_SYSTEM parse error")]
pub struct IdentifyError;
/// Run the postgres `IDENTIFY_SYSTEM` command
pub async fn identify_system(client: &tokio_postgres::Client) -> Result<IdentifySystem, Error> {
let query_str = "IDENTIFY_SYSTEM";
let response = client.simple_query(query_str).await?;
// get(N) from row, then parse it as some destination type.
fn get_parse<T>(row: &SimpleQueryRow, idx: usize) -> Option<T>
fn get_parse<T>(row: &SimpleQueryRow, idx: usize) -> Result<T, IdentifyError>
where
T: FromStr,
{
let val = row.get(idx)?;
val.parse::<T>().ok()
let val = row.get(idx).ok_or(IdentifyError)?;
val.parse::<T>().or(Err(IdentifyError))
}
// FIXME: turn unwrap() into errors.
// All of the tokio_postgres::Error builders are private, so we
// can't create them here. We'll just have to create our own error type.
if let SimpleQueryMessage::Row(first_row) = response.get(0).unwrap() {
// extract the row contents into an IdentifySystem struct.
// written as a closure so I can use ? for Option here.
if let Some(SimpleQueryMessage::Row(first_row)) = response.get(0) {
Ok(IdentifySystem {
systemid: get_parse(first_row, 0).unwrap(),
timeline: get_parse(first_row, 1).unwrap(),
xlogpos: get_parse(first_row, 2).unwrap(),
dbname: get_parse(first_row, 3),
systemid: get_parse(first_row, 0)?,
timeline: get_parse(first_row, 1)?,
xlogpos: get_parse(first_row, 2)?,
dbname: get_parse(first_row, 3).ok(),
})
} else {
// FIXME: return an error
panic!("identify_system returned non-row response");
Err(IdentifyError)?
}
}