diff --git a/Cargo.lock b/Cargo.lock index fe5aae6ae8..ab2f69929e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2454,6 +2454,7 @@ dependencies = [ "postgres", "postgres-protocol", "postgres-types", + "postgres_backend", "postgres_connection", "postgres_ffi", "pq_proto", @@ -2676,6 +2677,29 @@ dependencies = [ "postgres-protocol", ] +[[package]] +name = "postgres_backend" +version = "0.1.0" +dependencies = [ + "anyhow", + "async-trait", + "bytes", + "futures", + "once_cell", + "pq_proto", + "rustls", + "rustls-pemfile", + "serde", + "thiserror", + "tokio", + "tokio-postgres", + "tokio-postgres-rustls", + "tokio-rustls", + "tracing", + "utils", + "workspace_hack", +] + [[package]] name = "postgres_connection" version = "0.1.0" @@ -4507,7 +4531,6 @@ dependencies = [ "bytes", "criterion", "futures", - "git-version", "heapless", "hex", "hex-literal", @@ -4532,7 +4555,6 @@ dependencies = [ "tempfile", "thiserror", "tokio", - "tokio-rustls", "tracing", "tracing-subscriber", "url", diff --git a/Cargo.toml b/Cargo.toml index ea22b04124..bbd4975603 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -133,6 +133,7 @@ heapless = { default-features=false, features=[], git = "https://github.com/japa consumption_metrics = { version = "0.1", path = "./libs/consumption_metrics/" } metrics = { version = "0.1", path = "./libs/metrics/" } pageserver_api = { version = "0.1", path = "./libs/pageserver_api/" } +postgres_backend = { version = "0.1", path = "./libs/postgres_backend/" } postgres_connection = { version = "0.1", path = "./libs/postgres_connection/" } postgres_ffi = { version = "0.1", path = "./libs/postgres_ffi/" } pq_proto = { version = "0.1", path = "./libs/pq_proto/" } diff --git a/libs/postgres_backend/Cargo.toml b/libs/postgres_backend/Cargo.toml new file mode 100644 index 0000000000..bead77c4d6 --- /dev/null +++ b/libs/postgres_backend/Cargo.toml @@ -0,0 +1,27 @@ +[package] +name = "postgres_backend" +version = "0.1.0" +edition.workspace = true +license.workspace = true + +[dependencies] +async-trait.workspace = true +anyhow.workspace = true +bytes.workspace = true +futures.workspace = true +rustls.workspace = true +serde.workspace = true +thiserror.workspace = true +tokio.workspace = true +tokio-rustls.workspace = true +tracing.workspace = true + +pq_proto.workspace = true +utils.workspace = true +workspace_hack.workspace = true + +[dev-dependencies] +once_cell.workspace = true +rustls-pemfile.workspace = true +tokio-postgres.workspace = true +tokio-postgres-rustls.workspace = true \ No newline at end of file diff --git a/libs/utils/src/postgres_backend_async.rs b/libs/postgres_backend/src/lib.rs similarity index 99% rename from libs/utils/src/postgres_backend_async.rs rename to libs/postgres_backend/src/lib.rs index 442b06ed01..6e96e65a52 100644 --- a/libs/utils/src/postgres_backend_async.rs +++ b/libs/postgres_backend/src/lib.rs @@ -3,7 +3,6 @@ //! implementation determining how to process the queries. Currently its API //! is rather narrow, but we can extend it once required. -use crate::postgres_backend::AuthType; use anyhow::Context; use bytes::{Buf, Bytes, BytesMut}; use pq_proto::{BeMessage, ConnectionError, FeMessage, FeStartupPacket, SQLSTATE_INTERNAL_ERROR}; @@ -14,6 +13,7 @@ use std::sync::Arc; use std::task::Poll; use std::{future::Future, task::ready}; use tracing::{debug, error, info, trace}; +use utils::postgres_backend::AuthType; use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt, BufReader}; use tokio_rustls::TlsAcceptor; @@ -617,7 +617,7 @@ pub fn short_error(e: &QueryError) -> String { } } -pub(super) fn log_query_error(query: &str, e: &QueryError) { +pub fn log_query_error(query: &str, e: &QueryError) { match e { QueryError::Disconnected(ConnectionError::Socket(io_error)) => { if is_expected_io_error(io_error) { diff --git a/libs/utils/Cargo.toml b/libs/utils/Cargo.toml index 6acdb6fa53..206e40fce9 100644 --- a/libs/utils/Cargo.toml +++ b/libs/utils/Cargo.toml @@ -12,42 +12,40 @@ anyhow.workspace = true bincode.workspace = true bytes.workspace = true heapless.workspace = true +hex = { workspace = true, features = ["serde"] } hyper = { workspace = true, features = ["full"] } futures = { workspace = true} -routerify.workspace = true -serde.workspace = true -serde_json.workspace = true -thiserror.workspace = true -tokio.workspace = true -tokio-rustls.workspace = true -tracing.workspace = true -tracing-subscriber = { workspace = true, features = ["json"] } -nix.workspace = true -signal-hook.workspace = true -rand.workspace = true jsonwebtoken.workspace = true -hex = { workspace = true, features = ["serde"] } +nix.workspace = true +once_cell.workspace = true +routerify.workspace = true rustls.workspace = true rustls-split.workspace = true -git-version.workspace = true +serde.workspace = true +serde_json.workspace = true +signal-hook.workspace = true +thiserror.workspace = true +tokio.workspace = true +tracing.workspace = true +tracing-subscriber = { workspace = true, features = ["json"] } +rand.workspace = true serde_with.workspace = true -once_cell.workspace = true strum.workspace = true strum_macros.workspace = true +url.workspace = true +uuid = { version = "1.2", features = ["v4", "serde"] } metrics.workspace = true pq_proto.workspace = true - workspace_hack.workspace = true -url.workspace = true -uuid = { version = "1.2", features = ["v4", "serde"] } + [dev-dependencies] byteorder.workspace = true bytes.workspace = true -hex-literal.workspace = true -tempfile.workspace = true criterion.workspace = true +hex-literal.workspace = true rustls-pemfile.workspace = true +tempfile.workspace = true [[bench]] name = "benchmarks" diff --git a/libs/utils/src/lib.rs b/libs/utils/src/lib.rs index 9ddd702c72..7408eb66cd 100644 --- a/libs/utils/src/lib.rs +++ b/libs/utils/src/lib.rs @@ -14,7 +14,6 @@ pub mod vec_map; pub mod bin_ser; pub mod postgres_backend; -pub mod postgres_backend_async; // helper functions for creating and fsyncing pub mod crashsafe; diff --git a/libs/utils/src/postgres_backend.rs b/libs/utils/src/postgres_backend.rs index f3e3835bda..fc49aa6696 100644 --- a/libs/utils/src/postgres_backend.rs +++ b/libs/utils/src/postgres_backend.rs @@ -3,11 +3,10 @@ //! implementation determining how to process the queries. Currently its API //! is rather narrow, but we can extend it once required. -use crate::postgres_backend_async::{log_query_error, short_error, QueryError}; use crate::sock_split::{BidiStream, ReadStream, WriteStream}; use anyhow::Context; use bytes::{Bytes, BytesMut}; -use pq_proto::{BeMessage, FeMessage, FeStartupPacket}; +use pq_proto::{BeMessage, ConnectionError, FeMessage, FeStartupPacket, SQLSTATE_INTERNAL_ERROR}; use serde::{Deserialize, Serialize}; use std::fmt; use std::io::{self, Write}; @@ -17,6 +16,41 @@ use std::sync::Arc; use std::time::Duration; use tracing::*; +pub fn is_expected_io_error(e: &io::Error) -> bool { + use io::ErrorKind::*; + matches!( + e.kind(), + ConnectionRefused | ConnectionAborted | ConnectionReset + ) +} + +/// An error, occurred during query processing: +/// either during the connection ([`ConnectionError`]) or before/after it. +#[derive(thiserror::Error, Debug)] +pub enum QueryError { + /// The connection was lost while processing the query. + #[error(transparent)] + Disconnected(#[from] ConnectionError), + /// Some other error + #[error(transparent)] + Other(#[from] anyhow::Error), +} + +impl From for QueryError { + fn from(e: io::Error) -> Self { + Self::Disconnected(ConnectionError::Socket(e)) + } +} + +impl QueryError { + pub fn pg_error_code(&self) -> &'static [u8; 5] { + match self { + Self::Disconnected(_) => b"08006", // connection failure + Self::Other(_) => SQLSTATE_INTERNAL_ERROR, // internal error + } + } +} + pub trait Handler { /// Handle single query. /// postgres_backend will issue ReadyForQuery after calling this (this @@ -483,3 +517,28 @@ impl PostgresBackend { Ok(ProcessMsgResult::Continue) } } + +pub fn short_error(e: &QueryError) -> String { + match e { + QueryError::Disconnected(connection_error) => connection_error.to_string(), + QueryError::Other(e) => format!("{e:#}"), + } +} + +pub(super) fn log_query_error(query: &str, e: &QueryError) { + match e { + QueryError::Disconnected(ConnectionError::Socket(io_error)) => { + if is_expected_io_error(io_error) { + info!("query handler for '{query}' failed with expected io error: {io_error}"); + } else { + error!("query handler for '{query}' failed with io error: {io_error}"); + } + } + QueryError::Disconnected(other_connection_error) => { + error!("query handler for '{query}' failed with connection error: {other_connection_error:?}") + } + QueryError::Other(e) => { + error!("query handler for '{query}' failed: {e:?}"); + } + } +} diff --git a/libs/utils/tests/ssl_test.rs b/libs/utils/tests/ssl_test.rs index fae707f049..bf09f1b37d 100644 --- a/libs/utils/tests/ssl_test.rs +++ b/libs/utils/tests/ssl_test.rs @@ -10,8 +10,8 @@ use bytes::{Buf, BufMut, Bytes, BytesMut}; use once_cell::sync::Lazy; use utils::{ + postgres_backend::QueryError, postgres_backend::{AuthType, Handler, PostgresBackend}, - postgres_backend_async::QueryError, }; fn make_tcp_pair() -> (TcpStream, TcpStream) { diff --git a/pageserver/Cargo.toml b/pageserver/Cargo.toml index d2f0b84863..8d6641a387 100644 --- a/pageserver/Cargo.toml +++ b/pageserver/Cargo.toml @@ -37,6 +37,7 @@ num-traits.workspace = true once_cell.workspace = true pin-project-lite.workspace = true postgres.workspace = true +postgres_backend.workspace = true postgres-protocol.workspace = true postgres-types.workspace = true rand.workspace = true diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index b362e25424..dc4be9dd65 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -20,6 +20,7 @@ use pageserver_api::models::{ PagestreamFeMessage, PagestreamGetPageRequest, PagestreamGetPageResponse, PagestreamNblocksRequest, PagestreamNblocksResponse, }; +use postgres_backend::{self, is_expected_io_error, PostgresBackend, QueryError}; use pq_proto::ConnectionError; use pq_proto::FeStartupPacket; use pq_proto::{BeMessage, FeMessage, RowDescriptor}; @@ -36,7 +37,6 @@ use utils::{ id::{TenantId, TimelineId}, lsn::Lsn, postgres_backend::AuthType, - postgres_backend_async::{self, is_expected_io_error, PostgresBackend, QueryError}, simple_rcu::RcuReadGuard, }; @@ -721,7 +721,7 @@ impl PageServerHandler { } #[async_trait::async_trait] -impl postgres_backend_async::Handler for PageServerHandler { +impl postgres_backend::Handler for PageServerHandler { fn check_auth_jwt( &mut self, _pgb: &mut PostgresBackend, diff --git a/pageserver/src/tenant/timeline/walreceiver/walreceiver_connection.rs b/pageserver/src/tenant/timeline/walreceiver/walreceiver_connection.rs index 7e06c398af..f9d1e819a1 100644 --- a/pageserver/src/tenant/timeline/walreceiver/walreceiver_connection.rs +++ b/pageserver/src/tenant/timeline/walreceiver/walreceiver_connection.rs @@ -33,10 +33,11 @@ use crate::{ walingest::WalIngest, walrecord::DecodedWALRecord, }; +use postgres_backend::is_expected_io_error; use postgres_connection::PgConnectionConfig; use postgres_ffi::waldecoder::WalStreamDecoder; use pq_proto::ReplicationFeedback; -use utils::{lsn::Lsn, postgres_backend_async::is_expected_io_error}; +use utils::lsn::Lsn; /// Status of the connection. #[derive(Debug, Clone, Copy)] diff --git a/proxy/src/console/mgmt.rs b/proxy/src/console/mgmt.rs index c00c06fbb7..41f370add4 100644 --- a/proxy/src/console/mgmt.rs +++ b/proxy/src/console/mgmt.rs @@ -8,8 +8,8 @@ use pq_proto::{BeMessage, SINGLE_COL_ROWDESC}; use std::{net::TcpStream, thread}; use tracing::{error, info, info_span}; use utils::{ + postgres_backend::QueryError, postgres_backend::{self, AuthType, PostgresBackend}, - postgres_backend_async::QueryError, }; static CPLANE_WAITERS: Lazy> = Lazy::new(Default::default); diff --git a/safekeeper/src/handler.rs b/safekeeper/src/handler.rs index 99f0e90711..d1cd76459b 100644 --- a/safekeeper/src/handler.rs +++ b/safekeeper/src/handler.rs @@ -17,7 +17,7 @@ use pq_proto::{BeMessage, FeStartupPacket, RowDescriptor, INT4_OID, TEXT_OID}; use std::str; use tracing::info; use utils::auth::{Claims, Scope}; -use utils::postgres_backend_async::QueryError; +use utils::postgres_backend::QueryError; use utils::{ id::{TenantId, TenantTimelineId, TimelineId}, lsn::Lsn, diff --git a/safekeeper/src/json_ctrl.rs b/safekeeper/src/json_ctrl.rs index 32a24a4978..3d102a98d9 100644 --- a/safekeeper/src/json_ctrl.rs +++ b/safekeeper/src/json_ctrl.rs @@ -13,7 +13,7 @@ use bytes::Bytes; use serde::{Deserialize, Serialize}; use tracing::*; use utils::id::TenantTimelineId; -use utils::postgres_backend_async::QueryError; +use utils::postgres_backend::QueryError; use crate::handler::SafekeeperPostgresHandler; use crate::safekeeper::{AcceptorProposerMessage, AppendResponse, ServerInfo}; diff --git a/safekeeper/src/receive_wal.rs b/safekeeper/src/receive_wal.rs index 671e5470a0..0cf921d97a 100644 --- a/safekeeper/src/receive_wal.rs +++ b/safekeeper/src/receive_wal.rs @@ -8,7 +8,7 @@ use anyhow::Context; use bytes::BytesMut; use tracing::*; use utils::lsn::Lsn; -use utils::postgres_backend_async::QueryError; +use utils::postgres_backend::QueryError; use crate::safekeeper::ServerInfo; use crate::timeline::Timeline; diff --git a/safekeeper/src/send_wal.rs b/safekeeper/src/send_wal.rs index 411d0708b5..169ab03f0a 100644 --- a/safekeeper/src/send_wal.rs +++ b/safekeeper/src/send_wal.rs @@ -16,7 +16,7 @@ use std::net::Shutdown; use std::sync::Arc; use std::time::Duration; use std::{io, str, thread}; -use utils::postgres_backend_async::QueryError; +use utils::postgres_backend::QueryError; use pq_proto::{BeMessage, FeMessage, ReplicationFeedback, WalSndKeepAlive, XLogDataBody}; use tokio::sync::watch::Receiver; diff --git a/safekeeper/src/wal_service.rs b/safekeeper/src/wal_service.rs index 3ca651d060..40448be949 100644 --- a/safekeeper/src/wal_service.rs +++ b/safekeeper/src/wal_service.rs @@ -6,7 +6,7 @@ use regex::Regex; use std::net::{TcpListener, TcpStream}; use std::thread; use tracing::*; -use utils::postgres_backend_async::QueryError; +use utils::postgres_backend::QueryError; use crate::handler::SafekeeperPostgresHandler; use crate::SafeKeeperConf;