diff --git a/Cargo.lock b/Cargo.lock index 8ef2a6be00..8847c2095e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3030,6 +3030,7 @@ dependencies = [ "chrono", "clap", "consumption_metrics", + "fallible-iterator", "futures", "git-version", "hashbrown 0.13.2", diff --git a/proxy/Cargo.toml b/proxy/Cargo.toml index dfd2336f73..8deeb44d2a 100644 --- a/proxy/Cargo.toml +++ b/proxy/Cargo.toml @@ -69,6 +69,8 @@ postgres-native-tls.workspace = true workspace_hack.workspace = true tokio-util.workspace = true +fallible-iterator = "0.2.0" + [dev-dependencies] rcgen.workspace = true rstest.workspace = true diff --git a/proxy/src/http/sql_over_http.rs b/proxy/src/http/sql_over_http.rs index a4eb13a10b..8a04f43516 100644 --- a/proxy/src/http/sql_over_http.rs +++ b/proxy/src/http/sql_over_http.rs @@ -1,6 +1,8 @@ +use std::io::ErrorKind; use std::sync::Arc; use anyhow::bail; +use fallible_iterator::FallibleIterator; use futures::pin_mut; use futures::StreamExt; use hashbrown::HashMap; @@ -16,11 +18,20 @@ use tokio_postgres::types::Type; use tokio_postgres::GenericClient; use tokio_postgres::IsolationLevel; use tokio_postgres::Row; +use tokio_postgres::RowStream; +use tokio_postgres::Statement; use url::Url; +use crate::http::sql_over_http::codec::FrontendMessage; +use crate::http::sql_over_http::connection::RequestMessages; + use super::conn_pool::ConnInfo; use super::conn_pool::GlobalConnPool; +mod codec; +mod connection; +mod error; + #[derive(serde::Deserialize)] struct QueryData { query: String, @@ -351,6 +362,98 @@ async fn query_to_json( })) } +/// Pass text directly to the Postgres backend to allow it to sort out typing itself and +/// to save a roundtrip +pub async fn query_raw_txt<'a, S, I>(&self, query: S, params: I) -> Result +where + S: AsRef, + I: IntoIterator>, + I::IntoIter: ExactSizeIterator, +{ + use postgres_protocol::message::backend::Message; + use postgres_protocol::message::frontend; + + let params = params.into_iter(); + let params_len = params.len(); + + let buf = self.inner.with_buf(|buf| { + // Parse, anonymous portal + frontend::parse("", query.as_ref(), std::iter::empty(), buf) + .map_err(error::Error::encode)?; + // Bind, pass params as text, retrieve as binary + match frontend::bind( + "", // empty string selects the unnamed portal + "", // empty string selects the unnamed prepared statement + std::iter::empty(), // all parameters use the default format (text) + params, + |param, buf| match param { + Some(param) => { + buf.put_slice(param.as_ref().as_bytes()); + Ok(postgres_protocol::IsNull::No) + } + None => Ok(postgres_protocol::IsNull::Yes), + }, + Some(0), // all text + buf, + ) { + Ok(()) => Ok(()), + Err(frontend::BindError::Conversion(e)) => Err(error::Error::encode( + std::io::Error::new(ErrorKind::Other, e), + )), + Err(frontend::BindError::Serialization(e)) => Err(error::Error::encode(e)), + }?; + + // Describe portal to typecast results + frontend::describe(b'P', "", buf).map_err(error::Error::encode)?; + // Execute + frontend::execute("", 0, buf).map_err(error::Error::encode)?; + // Sync + frontend::sync(buf); + + Ok(buf.split().freeze()) + })?; + + let mut responses = self + .inner + .send(RequestMessages::Single(FrontendMessage::Raw(buf)))?; + + // now read the responses + + match responses.next().await? { + Message::ParseComplete => {} + _ => return Err(error::Error::unexpected_message()), + } + match responses.next().await? { + Message::BindComplete => {} + _ => return Err(error::Error::unexpected_message()), + } + let row_description = match responses.next().await? { + Message::RowDescription(body) => Some(body), + Message::NoData => None, + _ => return Err(error::Error::unexpected_message()), + }; + + // construct statement object + + let parameters = vec![Type::UNKNOWN; params_len]; + + let mut columns = vec![]; + if let Some(row_description) = row_description { + let mut it = row_description.fields(); + while let Some(field) = it.next().map_err(Error::parse)? { + // NB: for some types that function may send a query to the server. At least in + // raw text mode we don't need that info and can skip this. + let type_ = get_type(&self.inner, field.type_oid()).await?; + let column = Column::new(field.name().to_string(), type_, field); + columns.push(column); + } + } + + let statement = Statement::new_text(&self.inner, "".to_owned(), parameters, columns); + + Ok(RowStream::new(statement, responses)) +} + // // Convert postgres row with text-encoded values to JSON object // diff --git a/proxy/src/http/sql_over_http/codec.rs b/proxy/src/http/sql_over_http/codec.rs new file mode 100644 index 0000000000..fbfe7871bc --- /dev/null +++ b/proxy/src/http/sql_over_http/codec.rs @@ -0,0 +1,108 @@ +use bytes::{Bytes, BytesMut}; +use fallible_iterator::FallibleIterator; +use postgres_protocol::message::backend; +use std::io; +use tokio_util::codec::{Decoder, Encoder}; + +pub enum FrontendMessage { + Raw(Bytes), + // CopyData(CopyData>), +} + +pub enum BackendMessage { + Normal { + messages: BackendMessages, + request_complete: bool, + }, + Async(backend::Message), +} + +pub struct BackendMessages(BytesMut); + +impl BackendMessages { + pub fn empty() -> BackendMessages { + BackendMessages(BytesMut::new()) + } +} + +impl FallibleIterator for BackendMessages { + type Item = backend::Message; + type Error = io::Error; + + fn next(&mut self) -> io::Result> { + backend::Message::parse(&mut self.0) + } +} + +pub struct PostgresCodec { + pub max_message_size: Option, +} + +impl Encoder for PostgresCodec { + type Error = io::Error; + + fn encode(&mut self, item: FrontendMessage, dst: &mut BytesMut) -> io::Result<()> { + match item { + FrontendMessage::Raw(buf) => dst.extend_from_slice(&buf), + // FrontendMessage::CopyData(data) => data.write(dst), + } + + Ok(()) + } +} + +impl Decoder for PostgresCodec { + type Item = BackendMessage; + type Error = io::Error; + + fn decode(&mut self, src: &mut BytesMut) -> Result, io::Error> { + let mut idx = 0; + let mut request_complete = false; + + while let Some(header) = backend::Header::parse(&src[idx..])? { + let len = header.len() as usize + 1; + if src[idx..].len() < len { + break; + } + + if let Some(max) = self.max_message_size { + if len > max { + return Err(io::Error::new( + io::ErrorKind::InvalidInput, + "message too large", + )); + } + } + + match header.tag() { + backend::NOTICE_RESPONSE_TAG + | backend::NOTIFICATION_RESPONSE_TAG + | backend::PARAMETER_STATUS_TAG => { + if idx == 0 { + let message = backend::Message::parse(src)?.unwrap(); + return Ok(Some(BackendMessage::Async(message))); + } else { + break; + } + } + _ => {} + } + + idx += len; + + if header.tag() == backend::READY_FOR_QUERY_TAG { + request_complete = true; + break; + } + } + + if idx == 0 { + Ok(None) + } else { + Ok(Some(BackendMessage::Normal { + messages: BackendMessages(src.split_to(idx)), + request_complete, + })) + } + } +} diff --git a/proxy/src/http/sql_over_http/connection.rs b/proxy/src/http/sql_over_http/connection.rs new file mode 100644 index 0000000000..75636cffed --- /dev/null +++ b/proxy/src/http/sql_over_http/connection.rs @@ -0,0 +1,360 @@ +use super::codec::{BackendMessage, BackendMessages, FrontendMessage, PostgresCodec}; +use super::error::Error; +use bytes::BytesMut; +use fallible_iterator::FallibleIterator; +use futures::channel::mpsc; +use futures::{stream::FusedStream, Sink, Stream, StreamExt}; +use postgres_protocol::message::backend::Message; +use postgres_protocol::message::frontend; +use std::collections::{HashMap, VecDeque}; +use std::pin::Pin; +use std::task::{Context, Poll}; +use tokio::io::{AsyncRead, AsyncWrite}; +use tokio_postgres::maybe_tls_stream::MaybeTlsStream; +use tokio_util::codec::Framed; +use tracing::trace; + +pub enum RequestMessages { + Single(FrontendMessage), + // CopyIn(CopyInReceiver), + // CopyBoth(CopyBothReceiver), +} + +pub struct Request { + pub messages: RequestMessages, + pub sender: mpsc::Sender, +} + +pub struct Response { + sender: mpsc::Sender, +} + +#[derive(PartialEq, Debug)] +enum State { + Active, + Terminating, + Closing, +} + +/// A connection to a PostgreSQL database. +/// +/// This is one half of what is returned when a new connection is established. It performs the actual IO with the +/// server, and should generally be spawned off onto an executor to run in the background. +/// +/// `Connection` implements `Future`, and only resolves when the connection is closed, either because a fatal error has +/// occurred, or because its associated `Client` has dropped and all outstanding work has completed. +#[must_use = "futures do nothing unless polled"] +pub struct Connection { + /// HACK: we need this in the Neon Proxy. + pub stream: Framed, PostgresCodec>, + /// HACK: we need this in the Neon Proxy to forward params. + pub parameters: HashMap, + receiver: mpsc::UnboundedReceiver, + pending_request: Option, + pending_responses: VecDeque, + responses: VecDeque, + state: State, +} + +impl Connection +where + S: AsyncRead + AsyncWrite + Unpin, + T: AsyncRead + AsyncWrite + Unpin, +{ + pub(crate) fn new( + stream: Framed, PostgresCodec>, + pending_responses: VecDeque, + parameters: HashMap, + receiver: mpsc::UnboundedReceiver, + ) -> Connection { + Connection { + stream, + parameters, + receiver, + pending_request: None, + pending_responses, + responses: VecDeque::new(), + state: State::Active, + } + } + + fn poll_response( + &mut self, + cx: &mut Context<'_>, + ) -> Poll>> { + if let Some(message) = self.pending_responses.pop_front() { + trace!("retrying pending response"); + return Poll::Ready(Some(Ok(message))); + } + + Pin::new(&mut self.stream) + .poll_next(cx) + .map(|o| o.map(|r| r.map_err(Error::io))) + } + + fn poll_read(&mut self, cx: &mut Context<'_>) -> Result, Error> { + if self.state != State::Active { + trace!("poll_read: done"); + return Ok(None); + } + + loop { + let message = match self.poll_response(cx)? { + Poll::Ready(Some(message)) => message, + Poll::Ready(None) => return Err(Error::closed()), + Poll::Pending => { + trace!("poll_read: waiting on response"); + return Ok(None); + } + }; + + let (mut messages, request_complete) = match message { + BackendMessage::Async(Message::NoticeResponse(body)) => { + // TODO: log this + + // let error = DbError::parse(&mut body.fields()).map_err(Error::parse)?; + // return Ok(Some(AsyncMessage::Notice(error))); + continue; + } + BackendMessage::Async(Message::NotificationResponse(body)) => { + // TODO: log this + + // let notification = Notification { + // process_id: body.process_id(), + // channel: body.channel().map_err(Error::parse)?.to_string(), + // payload: body.message().map_err(Error::parse)?.to_string(), + // }; + // return Ok(Some(AsyncMessage::Notification(notification))); + continue; + } + BackendMessage::Async(Message::ParameterStatus(body)) => { + self.parameters.insert( + body.name().map_err(Error::parse)?.to_string(), + body.value().map_err(Error::parse)?.to_string(), + ); + continue; + } + BackendMessage::Async(_) => unreachable!(), + BackendMessage::Normal { + messages, + request_complete, + } => (messages, request_complete), + }; + + let mut response = match self.responses.pop_front() { + Some(response) => response, + None => match messages.next().map_err(Error::parse)? { + Some(Message::ErrorResponse(error)) => return Err(Error::db(error)), + _ => return Err(Error::unexpected_message()), + }, + }; + + match response.sender.poll_ready(cx) { + Poll::Ready(Ok(())) => { + let _ = response.sender.start_send(messages); + if !request_complete { + self.responses.push_front(response); + } + } + Poll::Ready(Err(_)) => { + // we need to keep paging through the rest of the messages even if the receiver's hung up + if !request_complete { + self.responses.push_front(response); + } + } + Poll::Pending => { + self.responses.push_front(response); + self.pending_responses.push_back(BackendMessage::Normal { + messages, + request_complete, + }); + trace!("poll_read: waiting on sender"); + return Ok(None); + } + } + } + } + + fn poll_request(&mut self, cx: &mut Context<'_>) -> Poll> { + if let Some(messages) = self.pending_request.take() { + trace!("retrying pending request"); + return Poll::Ready(Some(messages)); + } + + if self.receiver.is_terminated() { + return Poll::Ready(None); + } + + match self.receiver.poll_next_unpin(cx) { + Poll::Ready(Some(request)) => { + trace!("polled new request"); + self.responses.push_back(Response { + sender: request.sender, + }); + Poll::Ready(Some(request.messages)) + } + Poll::Ready(None) => Poll::Ready(None), + Poll::Pending => Poll::Pending, + } + } + + fn poll_write(&mut self, cx: &mut Context<'_>) -> Result { + loop { + if self.state == State::Closing { + trace!("poll_write: done"); + return Ok(false); + } + + if Pin::new(&mut self.stream) + .poll_ready(cx) + .map_err(Error::io)? + .is_pending() + { + trace!("poll_write: waiting on socket"); + return Ok(false); + } + + let request = match self.poll_request(cx) { + Poll::Ready(Some(request)) => request, + Poll::Ready(None) if self.responses.is_empty() && self.state == State::Active => { + trace!("poll_write: at eof, terminating"); + self.state = State::Terminating; + let mut request = BytesMut::new(); + frontend::terminate(&mut request); + RequestMessages::Single(FrontendMessage::Raw(request.freeze())) + } + Poll::Ready(None) => { + trace!( + "poll_write: at eof, pending responses {}", + self.responses.len() + ); + return Ok(true); + } + Poll::Pending => { + trace!("poll_write: waiting on request"); + return Ok(true); + } + }; + + match request { + RequestMessages::Single(request) => { + Pin::new(&mut self.stream) + .start_send(request) + .map_err(Error::io)?; + if self.state == State::Terminating { + trace!("poll_write: sent eof, closing"); + self.state = State::Closing; + } + } // RequestMessages::CopyIn(mut receiver) => { + // let message = match receiver.poll_next_unpin(cx) { + // Poll::Ready(Some(message)) => message, + // Poll::Ready(None) => { + // trace!("poll_write: finished copy_in request"); + // continue; + // } + // Poll::Pending => { + // trace!("poll_write: waiting on copy_in stream"); + // self.pending_request = Some(RequestMessages::CopyIn(receiver)); + // return Ok(true); + // } + // }; + // Pin::new(&mut self.stream) + // .start_send(message) + // .map_err(Error::io)?; + // self.pending_request = Some(RequestMessages::CopyIn(receiver)); + // } + // RequestMessages::CopyBoth(mut receiver) => { + // let message = match receiver.poll_next_unpin(cx) { + // Poll::Ready(Some(message)) => message, + // Poll::Ready(None) => { + // trace!("poll_write: finished copy_both request"); + // continue; + // } + // Poll::Pending => { + // trace!("poll_write: waiting on copy_both stream"); + // self.pending_request = Some(RequestMessages::CopyBoth(receiver)); + // return Ok(true); + // } + // }; + // Pin::new(&mut self.stream) + // .start_send(message) + // .map_err(Error::io)?; + // self.pending_request = Some(RequestMessages::CopyBoth(receiver)); + // } + } + } + } + + fn poll_flush(&mut self, cx: &mut Context<'_>) -> Result<(), Error> { + match Pin::new(&mut self.stream) + .poll_flush(cx) + .map_err(Error::io)? + { + Poll::Ready(()) => trace!("poll_flush: flushed"), + Poll::Pending => trace!("poll_flush: waiting on socket"), + } + Ok(()) + } + + fn poll_shutdown(&mut self, cx: &mut Context<'_>) -> Poll> { + if self.state != State::Closing { + return Poll::Pending; + } + + match Pin::new(&mut self.stream) + .poll_close(cx) + .map_err(Error::io)? + { + Poll::Ready(()) => { + trace!("poll_shutdown: complete"); + Poll::Ready(Ok(())) + } + Poll::Pending => { + trace!("poll_shutdown: waiting on socket"); + Poll::Pending + } + } + } + + /// Returns the value of a runtime parameter for this connection. + pub fn parameter(&self, name: &str) -> Option<&str> { + self.parameters.get(name).map(|s| &**s) + } + + /// Polls for asynchronous messages from the server. + /// + /// The server can send notices as well as notifications asynchronously to the client. Applications that wish to + /// examine those messages should use this method to drive the connection rather than its `Future` implementation. + pub fn poll_message(&mut self, cx: &mut Context<'_>) -> Poll>> { + let message = self.poll_read(cx)?; + let want_flush = self.poll_write(cx)?; + if want_flush { + self.poll_flush(cx)?; + } + match message { + Some(message) => Poll::Ready(Some(Ok(message))), + None => match self.poll_shutdown(cx) { + Poll::Ready(Ok(())) => Poll::Ready(None), + Poll::Ready(Err(e)) => Poll::Ready(Some(Err(e))), + Poll::Pending => Poll::Pending, + }, + } + } +} + +// impl Future for Connection +// where +// S: AsyncRead + AsyncWrite + Unpin, +// T: AsyncRead + AsyncWrite + Unpin, +// { +// type Output = Result<(), Error>; + +// fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { +// while let Some(message) = ready!(self.poll_message(cx)?) { +// if let AsyncMessage::Notice(notice) = message { +// info!("{}: {}", notice.severity(), notice.message()); +// } +// } +// Poll::Ready(Ok(())) +// } +// } diff --git a/proxy/src/http/sql_over_http/error.rs b/proxy/src/http/sql_over_http/error.rs new file mode 100644 index 0000000000..f77d87f59f --- /dev/null +++ b/proxy/src/http/sql_over_http/error.rs @@ -0,0 +1,424 @@ +use std::{error, fmt, io}; + +use postgres_protocol::message::backend::{ErrorFields, ErrorResponseBody}; +use tokio_postgres::error::{SqlState, ErrorPosition}; +use fallible_iterator::FallibleIterator; + +#[derive(Debug, PartialEq)] +enum Kind { + Io, + UnexpectedMessage, + Closed, + Db, + Parse, + Encode, +} + +struct ErrorInner { + kind: Kind, + cause: Option>, +} + +/// An error communicating with the Postgres server. +pub struct Error(Box); + +impl fmt::Debug for Error { + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt.debug_struct("Error") + .field("kind", &self.0.kind) + .field("cause", &self.0.cause) + .finish() + } +} + +impl fmt::Display for Error { + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + match &self.0.kind { + Kind::Io => fmt.write_str("error communicating with the server")?, + Kind::UnexpectedMessage => fmt.write_str("unexpected message from server")?, + Kind::Closed => fmt.write_str("connection closed")?, + Kind::Db => fmt.write_str("db error")?, + Kind::Parse => fmt.write_str("error parsing response from server")?, + Kind::Encode => fmt.write_str("error encoding message to server")?, + }; + if let Some(ref cause) = self.0.cause { + write!(fmt, ": {}", cause)?; + } + Ok(()) + } +} + +impl error::Error for Error { + fn source(&self) -> Option<&(dyn error::Error + 'static)> { + self.0.cause.as_ref().map(|e| &**e as _) + } +} + +impl Error { + /// Consumes the error, returning its cause. + pub fn into_source(self) -> Option> { + self.0.cause + } + + /// Returns the source of this error if it was a `DbError`. + /// + /// This is a simple convenience method. + pub fn as_db_error(&self) -> Option<&DbError> { + error::Error::source(self).and_then(|e| e.downcast_ref::()) + } + + /// Determines if the error was associated with closed connection. + pub fn is_closed(&self) -> bool { + self.0.kind == Kind::Closed + } + + /// Returns the SQLSTATE error code associated with the error. + /// + /// This is a convenience method that downcasts the cause to a `DbError` and returns its code. + pub fn code(&self) -> Option<&SqlState> { + self.as_db_error().map(DbError::code) + } + + fn new(kind: Kind, cause: Option>) -> Error { + Error(Box::new(ErrorInner { kind, cause })) + } + + #[allow(clippy::needless_pass_by_value)] + pub(crate) fn db(error: ErrorResponseBody) -> Error { + match DbError::parse(&mut error.fields()) { + Ok(e) => Error::new(Kind::Db, Some(Box::new(e))), + Err(e) => Error::new(Kind::Parse, Some(Box::new(e))), + } + } + + pub(crate) fn closed() -> Error { + Error::new(Kind::Closed, None) + } + + pub(crate) fn unexpected_message() -> Error { + Error::new(Kind::UnexpectedMessage, None) + } + + pub(crate) fn parse(e: io::Error) -> Error { + Error::new(Kind::Parse, Some(Box::new(e))) + } + + pub(crate) fn encode(e: io::Error) -> Error { + Error::new(Kind::Encode, Some(Box::new(e))) + } + + pub(crate) fn io(e: io::Error) -> Error { + Error::new(Kind::Io, Some(Box::new(e))) + } +} + +/// The severity of a Postgres error or notice. +#[derive(Debug, Copy, Clone, PartialEq, Eq)] +pub enum Severity { + /// PANIC + Panic, + /// FATAL + Fatal, + /// ERROR + Error, + /// WARNING + Warning, + /// NOTICE + Notice, + /// DEBUG + Debug, + /// INFO + Info, + /// LOG + Log, +} + +impl fmt::Display for Severity { + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + let s = match *self { + Severity::Panic => "PANIC", + Severity::Fatal => "FATAL", + Severity::Error => "ERROR", + Severity::Warning => "WARNING", + Severity::Notice => "NOTICE", + Severity::Debug => "DEBUG", + Severity::Info => "INFO", + Severity::Log => "LOG", + }; + fmt.write_str(s) + } +} + +impl Severity { + fn from_str(s: &str) -> Option { + match s { + "PANIC" => Some(Severity::Panic), + "FATAL" => Some(Severity::Fatal), + "ERROR" => Some(Severity::Error), + "WARNING" => Some(Severity::Warning), + "NOTICE" => Some(Severity::Notice), + "DEBUG" => Some(Severity::Debug), + "INFO" => Some(Severity::Info), + "LOG" => Some(Severity::Log), + _ => None, + } + } +} + +/// A Postgres error or notice. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct DbError { + severity: String, + parsed_severity: Option, + code: SqlState, + message: String, + detail: Option, + hint: Option, + position: Option, + where_: Option, + schema: Option, + table: Option, + column: Option, + datatype: Option, + constraint: Option, + file: Option, + line: Option, + routine: Option, +} + +impl DbError { + pub(crate) fn parse(fields: &mut ErrorFields<'_>) -> io::Result { + let mut severity = None; + let mut parsed_severity = None; + let mut code = None; + let mut message = None; + let mut detail = None; + let mut hint = None; + let mut normal_position = None; + let mut internal_position = None; + let mut internal_query = None; + let mut where_ = None; + let mut schema = None; + let mut table = None; + let mut column = None; + let mut datatype = None; + let mut constraint = None; + let mut file = None; + let mut line = None; + let mut routine = None; + + while let Some(field) = fields.next()? { + match field.type_() { + b'S' => severity = Some(field.value().to_owned()), + b'C' => code = Some(SqlState::from_code(field.value())), + b'M' => message = Some(field.value().to_owned()), + b'D' => detail = Some(field.value().to_owned()), + b'H' => hint = Some(field.value().to_owned()), + b'P' => { + normal_position = Some(field.value().parse::().map_err(|_| { + io::Error::new( + io::ErrorKind::InvalidInput, + "`P` field did not contain an integer", + ) + })?); + } + b'p' => { + internal_position = Some(field.value().parse::().map_err(|_| { + io::Error::new( + io::ErrorKind::InvalidInput, + "`p` field did not contain an integer", + ) + })?); + } + b'q' => internal_query = Some(field.value().to_owned()), + b'W' => where_ = Some(field.value().to_owned()), + b's' => schema = Some(field.value().to_owned()), + b't' => table = Some(field.value().to_owned()), + b'c' => column = Some(field.value().to_owned()), + b'd' => datatype = Some(field.value().to_owned()), + b'n' => constraint = Some(field.value().to_owned()), + b'F' => file = Some(field.value().to_owned()), + b'L' => { + line = Some(field.value().parse::().map_err(|_| { + io::Error::new( + io::ErrorKind::InvalidInput, + "`L` field did not contain an integer", + ) + })?); + } + b'R' => routine = Some(field.value().to_owned()), + b'V' => { + parsed_severity = Some(Severity::from_str(field.value()).ok_or_else(|| { + io::Error::new( + io::ErrorKind::InvalidInput, + "`V` field contained an invalid value", + ) + })?); + } + _ => {} + } + } + + Ok(DbError { + severity: severity + .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidInput, "`S` field missing"))?, + parsed_severity, + code: code + .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidInput, "`C` field missing"))?, + message: message + .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidInput, "`M` field missing"))?, + detail, + hint, + position: match normal_position { + Some(position) => Some(ErrorPosition::Original(position)), + None => match internal_position { + Some(position) => Some(ErrorPosition::Internal { + position, + query: internal_query.ok_or_else(|| { + io::Error::new( + io::ErrorKind::InvalidInput, + "`q` field missing but `p` field present", + ) + })?, + }), + None => None, + }, + }, + where_, + schema, + table, + column, + datatype, + constraint, + file, + line, + routine, + }) + } + + /// The field contents are ERROR, FATAL, or PANIC (in an error message), + /// or WARNING, NOTICE, DEBUG, INFO, or LOG (in a notice message), or a + /// localized translation of one of these. + pub fn severity(&self) -> &str { + &self.severity + } + + /// A parsed, nonlocalized version of `severity`. (PostgreSQL 9.6+) + pub fn parsed_severity(&self) -> Option { + self.parsed_severity + } + + /// The SQLSTATE code for the error. + pub fn code(&self) -> &SqlState { + &self.code + } + + /// The primary human-readable error message. + /// + /// This should be accurate but terse (typically one line). + pub fn message(&self) -> &str { + &self.message + } + + /// An optional secondary error message carrying more detail about the + /// problem. + /// + /// Might run to multiple lines. + pub fn detail(&self) -> Option<&str> { + self.detail.as_deref() + } + + /// An optional suggestion what to do about the problem. + /// + /// This is intended to differ from `detail` in that it offers advice + /// (potentially inappropriate) rather than hard facts. Might run to + /// multiple lines. + pub fn hint(&self) -> Option<&str> { + self.hint.as_deref() + } + + /// An optional error cursor position into either the original query string + /// or an internally generated query. + pub fn position(&self) -> Option<&ErrorPosition> { + self.position.as_ref() + } + + /// An indication of the context in which the error occurred. + /// + /// Presently this includes a call stack traceback of active procedural + /// language functions and internally-generated queries. The trace is one + /// entry per line, most recent first. + pub fn where_(&self) -> Option<&str> { + self.where_.as_deref() + } + + /// If the error was associated with a specific database object, the name + /// of the schema containing that object, if any. (PostgreSQL 9.3+) + pub fn schema(&self) -> Option<&str> { + self.schema.as_deref() + } + + /// If the error was associated with a specific table, the name of the + /// table. (Refer to the schema name field for the name of the table's + /// schema.) (PostgreSQL 9.3+) + pub fn table(&self) -> Option<&str> { + self.table.as_deref() + } + + /// If the error was associated with a specific table column, the name of + /// the column. + /// + /// (Refer to the schema and table name fields to identify the table.) + /// (PostgreSQL 9.3+) + pub fn column(&self) -> Option<&str> { + self.column.as_deref() + } + + /// If the error was associated with a specific data type, the name of the + /// data type. (Refer to the schema name field for the name of the data + /// type's schema.) (PostgreSQL 9.3+) + pub fn datatype(&self) -> Option<&str> { + self.datatype.as_deref() + } + + /// If the error was associated with a specific constraint, the name of the + /// constraint. + /// + /// Refer to fields listed above for the associated table or domain. + /// (For this purpose, indexes are treated as constraints, even if they + /// weren't created with constraint syntax.) (PostgreSQL 9.3+) + pub fn constraint(&self) -> Option<&str> { + self.constraint.as_deref() + } + + /// The file name of the source-code location where the error was reported. + pub fn file(&self) -> Option<&str> { + self.file.as_deref() + } + + /// The line number of the source-code location where the error was + /// reported. + pub fn line(&self) -> Option { + self.line + } + + /// The name of the source-code routine reporting the error. + pub fn routine(&self) -> Option<&str> { + self.routine.as_deref() + } +} + +impl fmt::Display for DbError { + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(fmt, "{}: {}", self.severity, self.message)?; + if let Some(detail) = &self.detail { + write!(fmt, "\nDETAIL: {}", detail)?; + } + if let Some(hint) = &self.hint { + write!(fmt, "\nHINT: {}", hint)?; + } + Ok(()) + } +} + +impl error::Error for DbError {}