Move async postgres_backend to its own crate.

To untie cyclic dependency between sync and async versions of postgres_backend,
copy QueryError and some logging/error routines to postgres_backend.rs. This is
temporal glue to make commits smaller, sync version will be dropped by the
upcoming commit completely.
This commit is contained in:
Arseny Sher
2023-03-07 10:08:57 +04:00
committed by Arseny Sher
parent 3f11a647c0
commit 7627d85345
17 changed files with 144 additions and 36 deletions

26
Cargo.lock generated
View File

@@ -2454,6 +2454,7 @@ dependencies = [
"postgres",
"postgres-protocol",
"postgres-types",
"postgres_backend",
"postgres_connection",
"postgres_ffi",
"pq_proto",
@@ -2676,6 +2677,29 @@ dependencies = [
"postgres-protocol",
]
[[package]]
name = "postgres_backend"
version = "0.1.0"
dependencies = [
"anyhow",
"async-trait",
"bytes",
"futures",
"once_cell",
"pq_proto",
"rustls",
"rustls-pemfile",
"serde",
"thiserror",
"tokio",
"tokio-postgres",
"tokio-postgres-rustls",
"tokio-rustls",
"tracing",
"utils",
"workspace_hack",
]
[[package]]
name = "postgres_connection"
version = "0.1.0"
@@ -4507,7 +4531,6 @@ dependencies = [
"bytes",
"criterion",
"futures",
"git-version",
"heapless",
"hex",
"hex-literal",
@@ -4532,7 +4555,6 @@ dependencies = [
"tempfile",
"thiserror",
"tokio",
"tokio-rustls",
"tracing",
"tracing-subscriber",
"url",

View File

@@ -133,6 +133,7 @@ heapless = { default-features=false, features=[], git = "https://github.com/japa
consumption_metrics = { version = "0.1", path = "./libs/consumption_metrics/" }
metrics = { version = "0.1", path = "./libs/metrics/" }
pageserver_api = { version = "0.1", path = "./libs/pageserver_api/" }
postgres_backend = { version = "0.1", path = "./libs/postgres_backend/" }
postgres_connection = { version = "0.1", path = "./libs/postgres_connection/" }
postgres_ffi = { version = "0.1", path = "./libs/postgres_ffi/" }
pq_proto = { version = "0.1", path = "./libs/pq_proto/" }

View File

@@ -0,0 +1,27 @@
[package]
name = "postgres_backend"
version = "0.1.0"
edition.workspace = true
license.workspace = true
[dependencies]
async-trait.workspace = true
anyhow.workspace = true
bytes.workspace = true
futures.workspace = true
rustls.workspace = true
serde.workspace = true
thiserror.workspace = true
tokio.workspace = true
tokio-rustls.workspace = true
tracing.workspace = true
pq_proto.workspace = true
utils.workspace = true
workspace_hack.workspace = true
[dev-dependencies]
once_cell.workspace = true
rustls-pemfile.workspace = true
tokio-postgres.workspace = true
tokio-postgres-rustls.workspace = true

View File

@@ -3,7 +3,6 @@
//! implementation determining how to process the queries. Currently its API
//! is rather narrow, but we can extend it once required.
use crate::postgres_backend::AuthType;
use anyhow::Context;
use bytes::{Buf, Bytes, BytesMut};
use pq_proto::{BeMessage, ConnectionError, FeMessage, FeStartupPacket, SQLSTATE_INTERNAL_ERROR};
@@ -14,6 +13,7 @@ use std::sync::Arc;
use std::task::Poll;
use std::{future::Future, task::ready};
use tracing::{debug, error, info, trace};
use utils::postgres_backend::AuthType;
use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt, BufReader};
use tokio_rustls::TlsAcceptor;
@@ -617,7 +617,7 @@ pub fn short_error(e: &QueryError) -> String {
}
}
pub(super) fn log_query_error(query: &str, e: &QueryError) {
pub fn log_query_error(query: &str, e: &QueryError) {
match e {
QueryError::Disconnected(ConnectionError::Socket(io_error)) => {
if is_expected_io_error(io_error) {

View File

@@ -12,42 +12,40 @@ anyhow.workspace = true
bincode.workspace = true
bytes.workspace = true
heapless.workspace = true
hex = { workspace = true, features = ["serde"] }
hyper = { workspace = true, features = ["full"] }
futures = { workspace = true}
routerify.workspace = true
serde.workspace = true
serde_json.workspace = true
thiserror.workspace = true
tokio.workspace = true
tokio-rustls.workspace = true
tracing.workspace = true
tracing-subscriber = { workspace = true, features = ["json"] }
nix.workspace = true
signal-hook.workspace = true
rand.workspace = true
jsonwebtoken.workspace = true
hex = { workspace = true, features = ["serde"] }
nix.workspace = true
once_cell.workspace = true
routerify.workspace = true
rustls.workspace = true
rustls-split.workspace = true
git-version.workspace = true
serde.workspace = true
serde_json.workspace = true
signal-hook.workspace = true
thiserror.workspace = true
tokio.workspace = true
tracing.workspace = true
tracing-subscriber = { workspace = true, features = ["json"] }
rand.workspace = true
serde_with.workspace = true
once_cell.workspace = true
strum.workspace = true
strum_macros.workspace = true
url.workspace = true
uuid = { version = "1.2", features = ["v4", "serde"] }
metrics.workspace = true
pq_proto.workspace = true
workspace_hack.workspace = true
url.workspace = true
uuid = { version = "1.2", features = ["v4", "serde"] }
[dev-dependencies]
byteorder.workspace = true
bytes.workspace = true
hex-literal.workspace = true
tempfile.workspace = true
criterion.workspace = true
hex-literal.workspace = true
rustls-pemfile.workspace = true
tempfile.workspace = true
[[bench]]
name = "benchmarks"

View File

@@ -14,7 +14,6 @@ pub mod vec_map;
pub mod bin_ser;
pub mod postgres_backend;
pub mod postgres_backend_async;
// helper functions for creating and fsyncing
pub mod crashsafe;

View File

@@ -3,11 +3,10 @@
//! implementation determining how to process the queries. Currently its API
//! is rather narrow, but we can extend it once required.
use crate::postgres_backend_async::{log_query_error, short_error, QueryError};
use crate::sock_split::{BidiStream, ReadStream, WriteStream};
use anyhow::Context;
use bytes::{Bytes, BytesMut};
use pq_proto::{BeMessage, FeMessage, FeStartupPacket};
use pq_proto::{BeMessage, ConnectionError, FeMessage, FeStartupPacket, SQLSTATE_INTERNAL_ERROR};
use serde::{Deserialize, Serialize};
use std::fmt;
use std::io::{self, Write};
@@ -17,6 +16,41 @@ use std::sync::Arc;
use std::time::Duration;
use tracing::*;
pub fn is_expected_io_error(e: &io::Error) -> bool {
use io::ErrorKind::*;
matches!(
e.kind(),
ConnectionRefused | ConnectionAborted | ConnectionReset
)
}
/// An error, occurred during query processing:
/// either during the connection ([`ConnectionError`]) or before/after it.
#[derive(thiserror::Error, Debug)]
pub enum QueryError {
/// The connection was lost while processing the query.
#[error(transparent)]
Disconnected(#[from] ConnectionError),
/// Some other error
#[error(transparent)]
Other(#[from] anyhow::Error),
}
impl From<io::Error> for QueryError {
fn from(e: io::Error) -> Self {
Self::Disconnected(ConnectionError::Socket(e))
}
}
impl QueryError {
pub fn pg_error_code(&self) -> &'static [u8; 5] {
match self {
Self::Disconnected(_) => b"08006", // connection failure
Self::Other(_) => SQLSTATE_INTERNAL_ERROR, // internal error
}
}
}
pub trait Handler {
/// Handle single query.
/// postgres_backend will issue ReadyForQuery after calling this (this
@@ -483,3 +517,28 @@ impl PostgresBackend {
Ok(ProcessMsgResult::Continue)
}
}
pub fn short_error(e: &QueryError) -> String {
match e {
QueryError::Disconnected(connection_error) => connection_error.to_string(),
QueryError::Other(e) => format!("{e:#}"),
}
}
pub(super) fn log_query_error(query: &str, e: &QueryError) {
match e {
QueryError::Disconnected(ConnectionError::Socket(io_error)) => {
if is_expected_io_error(io_error) {
info!("query handler for '{query}' failed with expected io error: {io_error}");
} else {
error!("query handler for '{query}' failed with io error: {io_error}");
}
}
QueryError::Disconnected(other_connection_error) => {
error!("query handler for '{query}' failed with connection error: {other_connection_error:?}")
}
QueryError::Other(e) => {
error!("query handler for '{query}' failed: {e:?}");
}
}
}

View File

@@ -10,8 +10,8 @@ use bytes::{Buf, BufMut, Bytes, BytesMut};
use once_cell::sync::Lazy;
use utils::{
postgres_backend::QueryError,
postgres_backend::{AuthType, Handler, PostgresBackend},
postgres_backend_async::QueryError,
};
fn make_tcp_pair() -> (TcpStream, TcpStream) {

View File

@@ -37,6 +37,7 @@ num-traits.workspace = true
once_cell.workspace = true
pin-project-lite.workspace = true
postgres.workspace = true
postgres_backend.workspace = true
postgres-protocol.workspace = true
postgres-types.workspace = true
rand.workspace = true

View File

@@ -20,6 +20,7 @@ use pageserver_api::models::{
PagestreamFeMessage, PagestreamGetPageRequest, PagestreamGetPageResponse,
PagestreamNblocksRequest, PagestreamNblocksResponse,
};
use postgres_backend::{self, is_expected_io_error, PostgresBackend, QueryError};
use pq_proto::ConnectionError;
use pq_proto::FeStartupPacket;
use pq_proto::{BeMessage, FeMessage, RowDescriptor};
@@ -36,7 +37,6 @@ use utils::{
id::{TenantId, TimelineId},
lsn::Lsn,
postgres_backend::AuthType,
postgres_backend_async::{self, is_expected_io_error, PostgresBackend, QueryError},
simple_rcu::RcuReadGuard,
};
@@ -721,7 +721,7 @@ impl PageServerHandler {
}
#[async_trait::async_trait]
impl postgres_backend_async::Handler for PageServerHandler {
impl postgres_backend::Handler for PageServerHandler {
fn check_auth_jwt(
&mut self,
_pgb: &mut PostgresBackend,

View File

@@ -33,10 +33,11 @@ use crate::{
walingest::WalIngest,
walrecord::DecodedWALRecord,
};
use postgres_backend::is_expected_io_error;
use postgres_connection::PgConnectionConfig;
use postgres_ffi::waldecoder::WalStreamDecoder;
use pq_proto::ReplicationFeedback;
use utils::{lsn::Lsn, postgres_backend_async::is_expected_io_error};
use utils::lsn::Lsn;
/// Status of the connection.
#[derive(Debug, Clone, Copy)]

View File

@@ -8,8 +8,8 @@ use pq_proto::{BeMessage, SINGLE_COL_ROWDESC};
use std::{net::TcpStream, thread};
use tracing::{error, info, info_span};
use utils::{
postgres_backend::QueryError,
postgres_backend::{self, AuthType, PostgresBackend},
postgres_backend_async::QueryError,
};
static CPLANE_WAITERS: Lazy<Waiters<ComputeReady>> = Lazy::new(Default::default);

View File

@@ -17,7 +17,7 @@ use pq_proto::{BeMessage, FeStartupPacket, RowDescriptor, INT4_OID, TEXT_OID};
use std::str;
use tracing::info;
use utils::auth::{Claims, Scope};
use utils::postgres_backend_async::QueryError;
use utils::postgres_backend::QueryError;
use utils::{
id::{TenantId, TenantTimelineId, TimelineId},
lsn::Lsn,

View File

@@ -13,7 +13,7 @@ use bytes::Bytes;
use serde::{Deserialize, Serialize};
use tracing::*;
use utils::id::TenantTimelineId;
use utils::postgres_backend_async::QueryError;
use utils::postgres_backend::QueryError;
use crate::handler::SafekeeperPostgresHandler;
use crate::safekeeper::{AcceptorProposerMessage, AppendResponse, ServerInfo};

View File

@@ -8,7 +8,7 @@ use anyhow::Context;
use bytes::BytesMut;
use tracing::*;
use utils::lsn::Lsn;
use utils::postgres_backend_async::QueryError;
use utils::postgres_backend::QueryError;
use crate::safekeeper::ServerInfo;
use crate::timeline::Timeline;

View File

@@ -16,7 +16,7 @@ use std::net::Shutdown;
use std::sync::Arc;
use std::time::Duration;
use std::{io, str, thread};
use utils::postgres_backend_async::QueryError;
use utils::postgres_backend::QueryError;
use pq_proto::{BeMessage, FeMessage, ReplicationFeedback, WalSndKeepAlive, XLogDataBody};
use tokio::sync::watch::Receiver;

View File

@@ -6,7 +6,7 @@ use regex::Regex;
use std::net::{TcpListener, TcpStream};
use std::thread;
use tracing::*;
use utils::postgres_backend_async::QueryError;
use utils::postgres_backend::QueryError;
use crate::handler::SafekeeperPostgresHandler;
use crate::SafeKeeperConf;