From fdde58120c2e64815469f44d1abea2c413dbdb9f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arpad=20M=C3=BCller?= Date: Mon, 24 Feb 2025 16:26:28 +0100 Subject: [PATCH] Upgrade proxy crates to edition 2024 (#10942) This upgrades the `proxy/` crate as well as the forked libraries in `libs/proxy/` to edition 2024. Also reformats the imports of those forked libraries via: ``` cargo +nightly fmt -p proxy -p postgres-protocol2 -p postgres-types2 -p tokio-postgres2 -- -l --config imports_granularity=Module,group_imports=StdExternalCrate,reorder_imports=true ``` It can be read commit-by-commit: the first commit has no formatting changes, only changes to accomodate the new edition. Part of #10918 --- libs/proxy/postgres-protocol2/Cargo.toml | 2 +- .../src/authentication/sasl.rs | 14 +++---- libs/proxy/postgres-protocol2/src/lib.rs | 3 +- .../postgres-protocol2/src/message/backend.rs | 8 ++-- .../src/message/frontend.rs | 8 ++-- .../postgres-protocol2/src/password/mod.rs | 3 +- .../proxy/postgres-protocol2/src/types/mod.rs | 7 ++-- libs/proxy/postgres-types2/Cargo.toml | 2 +- libs/proxy/postgres-types2/src/lib.rs | 9 ++-- libs/proxy/postgres-types2/src/private.rs | 6 ++- libs/proxy/tokio-postgres2/Cargo.toml | 2 +- .../proxy/tokio-postgres2/src/cancel_query.rs | 7 ++-- .../tokio-postgres2/src/cancel_query_raw.rs | 7 ++-- .../proxy/tokio-postgres2/src/cancel_token.rs | 10 ++--- libs/proxy/tokio-postgres2/src/client.rs | 41 +++++++++---------- libs/proxy/tokio-postgres2/src/codec.rs | 3 +- libs/proxy/tokio-postgres2/src/config.rs | 20 ++++----- libs/proxy/tokio-postgres2/src/connect.rs | 7 ++-- libs/proxy/tokio-postgres2/src/connect_raw.rs | 32 ++++++++------- .../tokio-postgres2/src/connect_socket.rs | 6 ++- libs/proxy/tokio-postgres2/src/connect_tls.rs | 13 +++--- libs/proxy/tokio-postgres2/src/connection.rs | 24 ++++++----- libs/proxy/tokio-postgres2/src/error/mod.rs | 6 +-- .../tokio-postgres2/src/generic_client.rs | 3 +- libs/proxy/tokio-postgres2/src/lib.rs | 3 +- .../tokio-postgres2/src/maybe_tls_stream.rs | 4 +- libs/proxy/tokio-postgres2/src/prepare.rs | 23 ++++++----- libs/proxy/tokio-postgres2/src/query.rs | 30 +++++++------- libs/proxy/tokio-postgres2/src/row.rs | 15 +++---- .../proxy/tokio-postgres2/src/simple_query.rs | 24 ++++++----- libs/proxy/tokio-postgres2/src/statement.rs | 15 ++++--- libs/proxy/tokio-postgres2/src/tls.rs | 1 + libs/proxy/tokio-postgres2/src/transaction.rs | 3 +- proxy/Cargo.toml | 2 +- proxy/src/auth/backend/console_redirect.rs | 4 +- proxy/src/auth/backend/jwt.rs | 6 +-- proxy/src/auth/backend/local.rs | 2 +- proxy/src/auth/backend/mod.rs | 8 ++-- proxy/src/auth/credentials.rs | 7 +++- proxy/src/auth/mod.rs | 6 +-- proxy/src/binary/local_proxy.rs | 4 +- proxy/src/binary/pg_sni_router.rs | 8 ++-- proxy/src/binary/proxy.rs | 14 ++++--- proxy/src/cache/project_info.rs | 4 +- proxy/src/cache/timed_lru.rs | 4 +- proxy/src/cancellation.rs | 4 +- proxy/src/config.rs | 11 +++-- proxy/src/console_redirect_proxy.rs | 40 +++++++++++++----- proxy/src/context/mod.rs | 2 +- proxy/src/context/parquet.rs | 28 ++++++------- .../control_plane/client/cplane_proxy_v1.rs | 6 +-- proxy/src/control_plane/client/mock.rs | 6 +-- proxy/src/control_plane/client/mod.rs | 6 +-- proxy/src/control_plane/errors.rs | 2 +- proxy/src/control_plane/mgmt.rs | 2 +- proxy/src/control_plane/mod.rs | 2 +- proxy/src/http/health_server.rs | 2 +- proxy/src/http/mod.rs | 2 +- proxy/src/logging.rs | 2 +- proxy/src/metrics.rs | 6 +-- proxy/src/protocol2.rs | 2 +- proxy/src/proxy/connect_compute.rs | 4 +- proxy/src/proxy/copy_bidirectional.rs | 2 +- proxy/src/proxy/mod.rs | 12 +++--- proxy/src/proxy/tests/mod.rs | 6 +-- proxy/src/proxy/wake_compute.rs | 2 +- proxy/src/rate_limiter/leaky_bucket.rs | 2 +- proxy/src/rate_limiter/limit_algorithm.rs | 2 +- proxy/src/rate_limiter/limiter.rs | 2 +- proxy/src/redis/elasticache.rs | 2 +- proxy/src/redis/keys.rs | 2 +- proxy/src/sasl/stream.rs | 2 +- proxy/src/scram/countmin.rs | 2 +- proxy/src/scram/exchange.rs | 4 +- proxy/src/scram/messages.rs | 2 +- proxy/src/scram/mod.rs | 2 +- proxy/src/scram/signature.rs | 2 +- proxy/src/serverless/backend.rs | 10 ++--- proxy/src/serverless/cancel_set.rs | 6 +-- proxy/src/serverless/conn_pool.rs | 8 ++-- proxy/src/serverless/conn_pool_lib.rs | 2 +- proxy/src/serverless/http_conn_pool.rs | 2 +- proxy/src/serverless/json.rs | 2 +- proxy/src/serverless/local_conn_pool.rs | 15 ++++--- proxy/src/serverless/mod.rs | 12 +++--- proxy/src/serverless/sql_over_http.rs | 18 ++++---- proxy/src/serverless/websocket.rs | 12 +++--- proxy/src/signals.rs | 2 +- proxy/src/tls/postgres_rustls.rs | 4 +- proxy/src/tls/server_config.rs | 4 +- proxy/src/usage_metrics.rs | 14 +++---- 91 files changed, 374 insertions(+), 340 deletions(-) diff --git a/libs/proxy/postgres-protocol2/Cargo.toml b/libs/proxy/postgres-protocol2/Cargo.toml index f66a292d5e..7ebb05eec1 100644 --- a/libs/proxy/postgres-protocol2/Cargo.toml +++ b/libs/proxy/postgres-protocol2/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "postgres-protocol2" version = "0.1.0" -edition = "2021" +edition = "2024" license = "MIT/Apache-2.0" [dependencies] diff --git a/libs/proxy/postgres-protocol2/src/authentication/sasl.rs b/libs/proxy/postgres-protocol2/src/authentication/sasl.rs index f2200a40ce..27e05e24ec 100644 --- a/libs/proxy/postgres-protocol2/src/authentication/sasl.rs +++ b/libs/proxy/postgres-protocol2/src/authentication/sasl.rs @@ -1,14 +1,12 @@ //! SASL-based authentication support. +use std::fmt::Write; +use std::{io, iter, mem, str}; + use hmac::{Hmac, Mac}; use rand::{self, Rng}; use sha2::digest::FixedOutput; use sha2::{Digest, Sha256}; -use std::fmt::Write; -use std::io; -use std::iter; -use std::mem; -use std::str; use tokio::task::yield_now; const NONCE_LENGTH: usize = 24; @@ -493,11 +491,9 @@ mod test { let nonce = "9IZ2O01zb9IgiIZ1WJ/zgpJB"; let client_first = "n,,n=,r=9IZ2O01zb9IgiIZ1WJ/zgpJB"; - let server_first = - "r=9IZ2O01zb9IgiIZ1WJ/zgpJBjx/oIRLs02gGSHcw1KEty3eY,s=fs3IXBy7U7+IvVjZ,i\ + let server_first = "r=9IZ2O01zb9IgiIZ1WJ/zgpJBjx/oIRLs02gGSHcw1KEty3eY,s=fs3IXBy7U7+IvVjZ,i\ =4096"; - let client_final = - "c=biws,r=9IZ2O01zb9IgiIZ1WJ/zgpJBjx/oIRLs02gGSHcw1KEty3eY,p=AmNKosjJzS3\ + let client_final = "c=biws,r=9IZ2O01zb9IgiIZ1WJ/zgpJBjx/oIRLs02gGSHcw1KEty3eY,p=AmNKosjJzS3\ 1NTlQYNs5BTeQjdHdk7lOflDo5re2an8="; let server_final = "v=U+ppxD5XUKtradnv8e2MkeupiA8FU87Sg8CXzXHDAzw="; diff --git a/libs/proxy/postgres-protocol2/src/lib.rs b/libs/proxy/postgres-protocol2/src/lib.rs index 6032440f9a..afbd1e92bd 100644 --- a/libs/proxy/postgres-protocol2/src/lib.rs +++ b/libs/proxy/postgres-protocol2/src/lib.rs @@ -11,9 +11,10 @@ //! set to `UTF8`. It will most likely not behave properly if that is not the case. #![warn(missing_docs, clippy::all)] +use std::io; + use byteorder::{BigEndian, ByteOrder}; use bytes::{BufMut, BytesMut}; -use std::io; pub mod authentication; pub mod escape; diff --git a/libs/proxy/postgres-protocol2/src/message/backend.rs b/libs/proxy/postgres-protocol2/src/message/backend.rs index 097964f9c1..d7eaef9509 100644 --- a/libs/proxy/postgres-protocol2/src/message/backend.rs +++ b/libs/proxy/postgres-protocol2/src/message/backend.rs @@ -1,13 +1,13 @@ #![allow(missing_docs)] +use std::io::{self, Read}; +use std::ops::Range; +use std::{cmp, str}; + use byteorder::{BigEndian, ByteOrder, ReadBytesExt}; use bytes::{Bytes, BytesMut}; use fallible_iterator::FallibleIterator; use memchr::memchr; -use std::cmp; -use std::io::{self, Read}; -use std::ops::Range; -use std::str; use crate::Oid; diff --git a/libs/proxy/postgres-protocol2/src/message/frontend.rs b/libs/proxy/postgres-protocol2/src/message/frontend.rs index 640f35ada3..b447290ea8 100644 --- a/libs/proxy/postgres-protocol2/src/message/frontend.rs +++ b/libs/proxy/postgres-protocol2/src/message/frontend.rs @@ -1,13 +1,13 @@ //! Frontend message serialization. #![allow(missing_docs)] +use std::error::Error; +use std::{io, marker}; + use byteorder::{BigEndian, ByteOrder}; use bytes::{Buf, BufMut, BytesMut}; -use std::error::Error; -use std::io; -use std::marker; -use crate::{write_nullable, FromUsize, IsNull, Oid}; +use crate::{FromUsize, IsNull, Oid, write_nullable}; #[inline] fn write_body(buf: &mut BytesMut, f: F) -> Result<(), E> diff --git a/libs/proxy/postgres-protocol2/src/password/mod.rs b/libs/proxy/postgres-protocol2/src/password/mod.rs index 38eb31dfcf..4cd9bfb060 100644 --- a/libs/proxy/postgres-protocol2/src/password/mod.rs +++ b/libs/proxy/postgres-protocol2/src/password/mod.rs @@ -6,12 +6,13 @@ //! side. This is good because it ensures the cleartext password won't //! end up in logs pg_stat displays, etc. -use crate::authentication::sasl; use hmac::{Hmac, Mac}; use rand::RngCore; use sha2::digest::FixedOutput; use sha2::{Digest, Sha256}; +use crate::authentication::sasl; + #[cfg(test)] mod test; diff --git a/libs/proxy/postgres-protocol2/src/types/mod.rs b/libs/proxy/postgres-protocol2/src/types/mod.rs index 78131c05bf..6a9b334bcb 100644 --- a/libs/proxy/postgres-protocol2/src/types/mod.rs +++ b/libs/proxy/postgres-protocol2/src/types/mod.rs @@ -1,11 +1,12 @@ //! Conversions to and from Postgres's binary format for various types. -use byteorder::{BigEndian, ReadBytesExt}; -use bytes::{BufMut, BytesMut}; -use fallible_iterator::FallibleIterator; use std::boxed::Box as StdBox; use std::error::Error; use std::str; +use byteorder::{BigEndian, ReadBytesExt}; +use bytes::{BufMut, BytesMut}; +use fallible_iterator::FallibleIterator; + use crate::Oid; #[cfg(test)] diff --git a/libs/proxy/postgres-types2/Cargo.toml b/libs/proxy/postgres-types2/Cargo.toml index 57efd94cd3..25ad23ba35 100644 --- a/libs/proxy/postgres-types2/Cargo.toml +++ b/libs/proxy/postgres-types2/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "postgres-types2" version = "0.1.0" -edition = "2021" +edition = "2024" license = "MIT/Apache-2.0" [dependencies] diff --git a/libs/proxy/postgres-types2/src/lib.rs b/libs/proxy/postgres-types2/src/lib.rs index d4f3afdfd4..0ccd8c295f 100644 --- a/libs/proxy/postgres-types2/src/lib.rs +++ b/libs/proxy/postgres-types2/src/lib.rs @@ -4,19 +4,18 @@ //! unless you want to define your own `ToSql` or `FromSql` definitions. #![warn(clippy::all, missing_docs)] -use fallible_iterator::FallibleIterator; -use postgres_protocol2::types; use std::any::type_name; use std::error::Error; use std::fmt; use std::sync::Arc; -use crate::type_gen::{Inner, Other}; - +use bytes::BytesMut; +use fallible_iterator::FallibleIterator; #[doc(inline)] pub use postgres_protocol2::Oid; +use postgres_protocol2::types; -use bytes::BytesMut; +use crate::type_gen::{Inner, Other}; /// Generates a simple implementation of `ToSql::accepts` which accepts the /// types passed to it. diff --git a/libs/proxy/postgres-types2/src/private.rs b/libs/proxy/postgres-types2/src/private.rs index 774f9a301c..188b982812 100644 --- a/libs/proxy/postgres-types2/src/private.rs +++ b/libs/proxy/postgres-types2/src/private.rs @@ -1,7 +1,9 @@ -use crate::{FromSql, Type}; -pub use bytes::BytesMut; use std::error::Error; +pub use bytes::BytesMut; + +use crate::{FromSql, Type}; + pub fn read_be_i32(buf: &mut &[u8]) -> Result> { if buf.len() < 4 { return Err("invalid buffer size".into()); diff --git a/libs/proxy/tokio-postgres2/Cargo.toml b/libs/proxy/tokio-postgres2/Cargo.toml index 161c6b8309..540876742f 100644 --- a/libs/proxy/tokio-postgres2/Cargo.toml +++ b/libs/proxy/tokio-postgres2/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "tokio-postgres2" version = "0.1.0" -edition = "2021" +edition = "2024" license = "MIT/Apache-2.0" [dependencies] diff --git a/libs/proxy/tokio-postgres2/src/cancel_query.rs b/libs/proxy/tokio-postgres2/src/cancel_query.rs index cddbf16336..b65fb571e6 100644 --- a/libs/proxy/tokio-postgres2/src/cancel_query.rs +++ b/libs/proxy/tokio-postgres2/src/cancel_query.rs @@ -1,10 +1,11 @@ +use std::io; + use tokio::net::TcpStream; use crate::client::SocketConfig; use crate::config::{Host, SslMode}; use crate::tls::MakeTlsConnect; -use crate::{cancel_query_raw, connect_socket, Error}; -use std::io; +use crate::{Error, cancel_query_raw, connect_socket}; pub(crate) async fn cancel_query( config: Option, @@ -22,7 +23,7 @@ where return Err(Error::connect(io::Error::new( io::ErrorKind::InvalidInput, "unknown host", - ))) + ))); } }; diff --git a/libs/proxy/tokio-postgres2/src/cancel_query_raw.rs b/libs/proxy/tokio-postgres2/src/cancel_query_raw.rs index 8c08296435..c720214e9b 100644 --- a/libs/proxy/tokio-postgres2/src/cancel_query_raw.rs +++ b/libs/proxy/tokio-postgres2/src/cancel_query_raw.rs @@ -1,10 +1,11 @@ -use crate::config::SslMode; -use crate::tls::TlsConnect; -use crate::{connect_tls, Error}; use bytes::BytesMut; use postgres_protocol2::message::frontend; use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt}; +use crate::config::SslMode; +use crate::tls::TlsConnect; +use crate::{Error, connect_tls}; + pub async fn cancel_query_raw( stream: S, mode: SslMode, diff --git a/libs/proxy/tokio-postgres2/src/cancel_token.rs b/libs/proxy/tokio-postgres2/src/cancel_token.rs index 718f903a92..f6526395ee 100644 --- a/libs/proxy/tokio-postgres2/src/cancel_token.rs +++ b/libs/proxy/tokio-postgres2/src/cancel_token.rs @@ -1,12 +1,12 @@ -use crate::config::SslMode; -use crate::tls::TlsConnect; - -use crate::{cancel_query, client::SocketConfig, tls::MakeTlsConnect}; -use crate::{cancel_query_raw, Error}; use serde::{Deserialize, Serialize}; use tokio::io::{AsyncRead, AsyncWrite}; use tokio::net::TcpStream; +use crate::client::SocketConfig; +use crate::config::SslMode; +use crate::tls::{MakeTlsConnect, TlsConnect}; +use crate::{Error, cancel_query, cancel_query_raw}; + /// The capability to request cancellation of in-progress queries on a /// connection. #[derive(Clone, Serialize, Deserialize)] diff --git a/libs/proxy/tokio-postgres2/src/client.rs b/libs/proxy/tokio-postgres2/src/client.rs index 46151ab924..39b1db75da 100644 --- a/libs/proxy/tokio-postgres2/src/client.rs +++ b/libs/proxy/tokio-postgres2/src/client.rs @@ -1,31 +1,28 @@ -use crate::codec::{BackendMessages, FrontendMessage}; - -use crate::config::Host; -use crate::config::SslMode; -use crate::connection::{Request, RequestMessages}; - -use crate::query::RowStream; -use crate::simple_query::SimpleQueryStream; - -use crate::types::{Oid, ToSql, Type}; - -use crate::{ - query, simple_query, slice_iter, CancelToken, Error, ReadyForQueryStatus, Row, - SimpleQueryMessage, Statement, Transaction, TransactionBuilder, -}; -use bytes::BytesMut; -use fallible_iterator::FallibleIterator; -use futures_util::{future, ready, TryStreamExt}; -use parking_lot::Mutex; -use postgres_protocol2::message::{backend::Message, frontend}; -use serde::{Deserialize, Serialize}; use std::collections::HashMap; use std::fmt; use std::sync::Arc; use std::task::{Context, Poll}; +use std::time::Duration; + +use bytes::BytesMut; +use fallible_iterator::FallibleIterator; +use futures_util::{TryStreamExt, future, ready}; +use parking_lot::Mutex; +use postgres_protocol2::message::backend::Message; +use postgres_protocol2::message::frontend; +use serde::{Deserialize, Serialize}; use tokio::sync::mpsc; -use std::time::Duration; +use crate::codec::{BackendMessages, FrontendMessage}; +use crate::config::{Host, SslMode}; +use crate::connection::{Request, RequestMessages}; +use crate::query::RowStream; +use crate::simple_query::SimpleQueryStream; +use crate::types::{Oid, ToSql, Type}; +use crate::{ + CancelToken, Error, ReadyForQueryStatus, Row, SimpleQueryMessage, Statement, Transaction, + TransactionBuilder, query, simple_query, slice_iter, +}; pub struct Responses { receiver: mpsc::Receiver, diff --git a/libs/proxy/tokio-postgres2/src/codec.rs b/libs/proxy/tokio-postgres2/src/codec.rs index 0ec46198ce..f1fd9b47b3 100644 --- a/libs/proxy/tokio-postgres2/src/codec.rs +++ b/libs/proxy/tokio-postgres2/src/codec.rs @@ -1,8 +1,9 @@ +use std::io; + use bytes::{Buf, Bytes, BytesMut}; use fallible_iterator::FallibleIterator; use postgres_protocol2::message::backend; use postgres_protocol2::message::frontend::CopyData; -use std::io; use tokio_util::codec::{Decoder, Encoder}; pub enum FrontendMessage { diff --git a/libs/proxy/tokio-postgres2/src/config.rs b/libs/proxy/tokio-postgres2/src/config.rs index 47cc45ac80..4c25491b67 100644 --- a/libs/proxy/tokio-postgres2/src/config.rs +++ b/libs/proxy/tokio-postgres2/src/config.rs @@ -1,21 +1,19 @@ //! Connection configuration. -use crate::connect::connect; -use crate::connect_raw::connect_raw; -use crate::connect_raw::RawConnection; -use crate::tls::MakeTlsConnect; -use crate::tls::TlsConnect; -use crate::{Client, Connection, Error}; -use postgres_protocol2::message::frontend::StartupMessageParams; -use serde::{Deserialize, Serialize}; -use std::fmt; -use std::str; use std::time::Duration; -use tokio::io::{AsyncRead, AsyncWrite}; +use std::{fmt, str}; pub use postgres_protocol2::authentication::sasl::ScramKeys; +use postgres_protocol2::message::frontend::StartupMessageParams; +use serde::{Deserialize, Serialize}; +use tokio::io::{AsyncRead, AsyncWrite}; use tokio::net::TcpStream; +use crate::connect::connect; +use crate::connect_raw::{RawConnection, connect_raw}; +use crate::tls::{MakeTlsConnect, TlsConnect}; +use crate::{Client, Connection, Error}; + /// TLS configuration. #[derive(Debug, Copy, Clone, PartialEq, Eq, Serialize, Deserialize)] #[non_exhaustive] diff --git a/libs/proxy/tokio-postgres2/src/connect.rs b/libs/proxy/tokio-postgres2/src/connect.rs index e0cb69748d..d2bd0dfbcd 100644 --- a/libs/proxy/tokio-postgres2/src/connect.rs +++ b/libs/proxy/tokio-postgres2/src/connect.rs @@ -1,3 +1,7 @@ +use postgres_protocol2::message::backend::Message; +use tokio::net::TcpStream; +use tokio::sync::mpsc; + use crate::client::SocketConfig; use crate::codec::BackendMessage; use crate::config::Host; @@ -5,9 +9,6 @@ use crate::connect_raw::connect_raw; use crate::connect_socket::connect_socket; use crate::tls::{MakeTlsConnect, TlsConnect}; use crate::{Client, Config, Connection, Error, RawConnection}; -use postgres_protocol2::message::backend::Message; -use tokio::net::TcpStream; -use tokio::sync::mpsc; pub async fn connect( mut tls: T, diff --git a/libs/proxy/tokio-postgres2/src/connect_raw.rs b/libs/proxy/tokio-postgres2/src/connect_raw.rs index 66db85e07d..20dc538cf2 100644 --- a/libs/proxy/tokio-postgres2/src/connect_raw.rs +++ b/libs/proxy/tokio-postgres2/src/connect_raw.rs @@ -1,22 +1,24 @@ +use std::collections::HashMap; +use std::io; +use std::pin::Pin; +use std::task::{Context, Poll}; + +use bytes::BytesMut; +use fallible_iterator::FallibleIterator; +use futures_util::{Sink, SinkExt, Stream, TryStreamExt, ready}; +use postgres_protocol2::authentication::sasl; +use postgres_protocol2::authentication::sasl::ScramSha256; +use postgres_protocol2::message::backend::{AuthenticationSaslBody, Message, NoticeResponseBody}; +use postgres_protocol2::message::frontend; +use tokio::io::{AsyncRead, AsyncWrite}; +use tokio_util::codec::Framed; + +use crate::Error; use crate::codec::{BackendMessage, BackendMessages, FrontendMessage, PostgresCodec}; use crate::config::{self, AuthKeys, Config}; use crate::connect_tls::connect_tls; use crate::maybe_tls_stream::MaybeTlsStream; use crate::tls::{TlsConnect, TlsStream}; -use crate::Error; -use bytes::BytesMut; -use fallible_iterator::FallibleIterator; -use futures_util::{ready, Sink, SinkExt, Stream, TryStreamExt}; -use postgres_protocol2::authentication::sasl; -use postgres_protocol2::authentication::sasl::ScramSha256; -use postgres_protocol2::message::backend::{AuthenticationSaslBody, Message, NoticeResponseBody}; -use postgres_protocol2::message::frontend; -use std::collections::HashMap; -use std::io; -use std::pin::Pin; -use std::task::{Context, Poll}; -use tokio::io::{AsyncRead, AsyncWrite}; -use tokio_util::codec::Framed; pub struct StartupStream { inner: Framed, PostgresCodec>, @@ -158,7 +160,7 @@ where | Some(Message::AuthenticationSspi) => { return Err(Error::authentication( "unsupported authentication method".into(), - )) + )); } Some(Message::ErrorResponse(body)) => return Err(Error::db(body)), Some(_) => return Err(Error::unexpected_message()), diff --git a/libs/proxy/tokio-postgres2/src/connect_socket.rs b/libs/proxy/tokio-postgres2/src/connect_socket.rs index 336a13317f..15411f7ef3 100644 --- a/libs/proxy/tokio-postgres2/src/connect_socket.rs +++ b/libs/proxy/tokio-postgres2/src/connect_socket.rs @@ -1,11 +1,13 @@ -use crate::config::Host; -use crate::Error; use std::future::Future; use std::io; use std::time::Duration; + use tokio::net::{self, TcpStream}; use tokio::time; +use crate::Error; +use crate::config::Host; + pub(crate) async fn connect_socket( host: &Host, port: u16, diff --git a/libs/proxy/tokio-postgres2/src/connect_tls.rs b/libs/proxy/tokio-postgres2/src/connect_tls.rs index 64b0b68abc..4dc929a9e2 100644 --- a/libs/proxy/tokio-postgres2/src/connect_tls.rs +++ b/libs/proxy/tokio-postgres2/src/connect_tls.rs @@ -1,12 +1,13 @@ -use crate::config::SslMode; -use crate::maybe_tls_stream::MaybeTlsStream; -use crate::tls::private::ForcePrivateApi; -use crate::tls::TlsConnect; -use crate::Error; use bytes::BytesMut; use postgres_protocol2::message::frontend; use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt}; +use crate::Error; +use crate::config::SslMode; +use crate::maybe_tls_stream::MaybeTlsStream; +use crate::tls::TlsConnect; +use crate::tls::private::ForcePrivateApi; + pub async fn connect_tls( mut stream: S, mode: SslMode, @@ -19,7 +20,7 @@ where match mode { SslMode::Disable => return Ok(MaybeTlsStream::Raw(stream)), SslMode::Prefer if !tls.can_connect(ForcePrivateApi) => { - return Ok(MaybeTlsStream::Raw(stream)) + return Ok(MaybeTlsStream::Raw(stream)); } SslMode::Prefer | SslMode::Require => {} } diff --git a/libs/proxy/tokio-postgres2/src/connection.rs b/libs/proxy/tokio-postgres2/src/connection.rs index f478717e0d..60e39b3b44 100644 --- a/libs/proxy/tokio-postgres2/src/connection.rs +++ b/libs/proxy/tokio-postgres2/src/connection.rs @@ -1,22 +1,24 @@ -use crate::codec::{BackendMessage, BackendMessages, FrontendMessage, PostgresCodec}; -use crate::error::DbError; -use crate::maybe_tls_stream::MaybeTlsStream; -use crate::{AsyncMessage, Error, Notification}; -use bytes::BytesMut; -use fallible_iterator::FallibleIterator; -use futures_util::{ready, Sink, Stream}; -use log::{info, trace}; -use postgres_protocol2::message::backend::Message; -use postgres_protocol2::message::frontend; use std::collections::{HashMap, VecDeque}; use std::future::Future; use std::pin::Pin; use std::task::{Context, Poll}; + +use bytes::BytesMut; +use fallible_iterator::FallibleIterator; +use futures_util::{Sink, Stream, ready}; +use log::{info, trace}; +use postgres_protocol2::message::backend::Message; +use postgres_protocol2::message::frontend; use tokio::io::{AsyncRead, AsyncWrite}; use tokio::sync::mpsc; use tokio_util::codec::Framed; use tokio_util::sync::PollSender; +use crate::codec::{BackendMessage, BackendMessages, FrontendMessage, PostgresCodec}; +use crate::error::DbError; +use crate::maybe_tls_stream::MaybeTlsStream; +use crate::{AsyncMessage, Error, Notification}; + pub enum RequestMessages { Single(FrontendMessage), } @@ -139,7 +141,7 @@ where Some(response) => response, None => match messages.next().map_err(Error::parse)? { Some(Message::ErrorResponse(error)) => { - return Poll::Ready(Err(Error::db(error))) + return Poll::Ready(Err(Error::db(error))); } _ => return Poll::Ready(Err(Error::unexpected_message())), }, diff --git a/libs/proxy/tokio-postgres2/src/error/mod.rs b/libs/proxy/tokio-postgres2/src/error/mod.rs index 922c348525..b12e76e5bf 100644 --- a/libs/proxy/tokio-postgres2/src/error/mod.rs +++ b/libs/proxy/tokio-postgres2/src/error/mod.rs @@ -1,10 +1,10 @@ //! Errors. +use std::error::{self, Error as _Error}; +use std::{fmt, io}; + use fallible_iterator::FallibleIterator; use postgres_protocol2::message::backend::{ErrorFields, ErrorResponseBody}; -use std::error::{self, Error as _Error}; -use std::fmt; -use std::io; pub use self::sqlstate::*; diff --git a/libs/proxy/tokio-postgres2/src/generic_client.rs b/libs/proxy/tokio-postgres2/src/generic_client.rs index 042b5a675e..31c3d8fa3e 100644 --- a/libs/proxy/tokio-postgres2/src/generic_client.rs +++ b/libs/proxy/tokio-postgres2/src/generic_client.rs @@ -1,9 +1,10 @@ #![allow(async_fn_in_trait)] +use postgres_protocol2::Oid; + use crate::query::RowStream; use crate::types::Type; use crate::{Client, Error, Transaction}; -use postgres_protocol2::Oid; mod private { pub trait Sealed {} diff --git a/libs/proxy/tokio-postgres2/src/lib.rs b/libs/proxy/tokio-postgres2/src/lib.rs index 7426279167..c8ebba5487 100644 --- a/libs/proxy/tokio-postgres2/src/lib.rs +++ b/libs/proxy/tokio-postgres2/src/lib.rs @@ -1,6 +1,8 @@ //! An asynchronous, pipelined, PostgreSQL client. #![warn(clippy::all)] +use postgres_protocol2::message::backend::ReadyForQueryBody; + pub use crate::cancel_token::CancelToken; pub use crate::client::{Client, SocketConfig}; pub use crate::config::Config; @@ -17,7 +19,6 @@ pub use crate::tls::NoTls; pub use crate::transaction::Transaction; pub use crate::transaction_builder::{IsolationLevel, TransactionBuilder}; use crate::types::ToSql; -use postgres_protocol2::message::backend::ReadyForQueryBody; /// After executing a query, the connection will be in one of these states #[derive(Clone, Copy, Debug, PartialEq)] diff --git a/libs/proxy/tokio-postgres2/src/maybe_tls_stream.rs b/libs/proxy/tokio-postgres2/src/maybe_tls_stream.rs index 9a7e248997..4aa838613e 100644 --- a/libs/proxy/tokio-postgres2/src/maybe_tls_stream.rs +++ b/libs/proxy/tokio-postgres2/src/maybe_tls_stream.rs @@ -1,12 +1,14 @@ //! MaybeTlsStream. //! //! Represents a stream that may or may not be encrypted with TLS. -use crate::tls::{ChannelBinding, TlsStream}; use std::io; use std::pin::Pin; use std::task::{Context, Poll}; + use tokio::io::{AsyncRead, AsyncWrite, ReadBuf}; +use crate::tls::{ChannelBinding, TlsStream}; + /// A stream that may or may not be encrypted with TLS. pub enum MaybeTlsStream { /// An unencrypted stream. diff --git a/libs/proxy/tokio-postgres2/src/prepare.rs b/libs/proxy/tokio-postgres2/src/prepare.rs index 58bbb26cbc..b36d2e5f74 100644 --- a/libs/proxy/tokio-postgres2/src/prepare.rs +++ b/libs/proxy/tokio-postgres2/src/prepare.rs @@ -1,18 +1,19 @@ +use std::future::Future; +use std::pin::Pin; +use std::sync::Arc; + +use bytes::Bytes; +use fallible_iterator::FallibleIterator; +use futures_util::{TryStreamExt, pin_mut}; +use log::debug; +use postgres_protocol2::message::backend::Message; +use postgres_protocol2::message::frontend; + use crate::client::InnerClient; use crate::codec::FrontendMessage; use crate::connection::RequestMessages; use crate::types::{Field, Kind, Oid, Type}; -use crate::{query, slice_iter}; -use crate::{Column, Error, Statement}; -use bytes::Bytes; -use fallible_iterator::FallibleIterator; -use futures_util::{pin_mut, TryStreamExt}; -use log::debug; -use postgres_protocol2::message::backend::Message; -use postgres_protocol2::message::frontend; -use std::future::Future; -use std::pin::Pin; -use std::sync::Arc; +use crate::{Column, Error, Statement, query, slice_iter}; pub(crate) const TYPEINFO_QUERY: &str = "\ SELECT t.typname, t.typtype, t.typelem, r.rngsubtype, t.typbasetype, n.nspname, t.typrelid diff --git a/libs/proxy/tokio-postgres2/src/query.rs b/libs/proxy/tokio-postgres2/src/query.rs index e21631c85d..29f05fba79 100644 --- a/libs/proxy/tokio-postgres2/src/query.rs +++ b/libs/proxy/tokio-postgres2/src/query.rs @@ -1,22 +1,24 @@ -use crate::client::{InnerClient, Responses}; -use crate::codec::FrontendMessage; -use crate::connection::RequestMessages; -use crate::types::IsNull; -use crate::{Column, Error, ReadyForQueryStatus, Row, Statement}; -use bytes::{BufMut, Bytes, BytesMut}; -use fallible_iterator::FallibleIterator; -use futures_util::{ready, Stream}; -use log::{debug, log_enabled, Level}; -use pin_project_lite::pin_project; -use postgres_protocol2::message::backend::Message; -use postgres_protocol2::message::frontend; -use postgres_types2::{Format, ToSql, Type}; use std::fmt; use std::marker::PhantomPinned; use std::pin::Pin; use std::sync::Arc; use std::task::{Context, Poll}; +use bytes::{BufMut, Bytes, BytesMut}; +use fallible_iterator::FallibleIterator; +use futures_util::{Stream, ready}; +use log::{Level, debug, log_enabled}; +use pin_project_lite::pin_project; +use postgres_protocol2::message::backend::Message; +use postgres_protocol2::message::frontend; +use postgres_types2::{Format, ToSql, Type}; + +use crate::client::{InnerClient, Responses}; +use crate::codec::FrontendMessage; +use crate::connection::RequestMessages; +use crate::types::IsNull; +use crate::{Column, Error, ReadyForQueryStatus, Row, Statement}; + struct BorrowToSqlParamsDebug<'a>(&'a [&'a (dyn ToSql + Sync)]); impl fmt::Debug for BorrowToSqlParamsDebug<'_> { @@ -257,7 +259,7 @@ impl Stream for RowStream { this.statement.clone(), body, *this.output_format, - )?))) + )?))); } Message::EmptyQueryResponse | Message::PortalSuspended => {} Message::CommandComplete(body) => { diff --git a/libs/proxy/tokio-postgres2/src/row.rs b/libs/proxy/tokio-postgres2/src/row.rs index 10e130707d..5fc955eef4 100644 --- a/libs/proxy/tokio-postgres2/src/row.rs +++ b/libs/proxy/tokio-postgres2/src/row.rs @@ -1,17 +1,18 @@ //! Rows. +use std::ops::Range; +use std::sync::Arc; +use std::{fmt, str}; + +use fallible_iterator::FallibleIterator; +use postgres_protocol2::message::backend::DataRowBody; +use postgres_types2::{Format, WrongFormat}; + use crate::row::sealed::{AsName, Sealed}; use crate::simple_query::SimpleColumn; use crate::statement::Column; use crate::types::{FromSql, Type, WrongType}; use crate::{Error, Statement}; -use fallible_iterator::FallibleIterator; -use postgres_protocol2::message::backend::DataRowBody; -use postgres_types2::{Format, WrongFormat}; -use std::fmt; -use std::ops::Range; -use std::str; -use std::sync::Arc; mod sealed { pub trait Sealed {} diff --git a/libs/proxy/tokio-postgres2/src/simple_query.rs b/libs/proxy/tokio-postgres2/src/simple_query.rs index fb2550377b..f13d63983f 100644 --- a/libs/proxy/tokio-postgres2/src/simple_query.rs +++ b/libs/proxy/tokio-postgres2/src/simple_query.rs @@ -1,19 +1,21 @@ -use crate::client::{InnerClient, Responses}; -use crate::codec::FrontendMessage; -use crate::connection::RequestMessages; -use crate::{Error, ReadyForQueryStatus, SimpleQueryMessage, SimpleQueryRow}; -use bytes::Bytes; -use fallible_iterator::FallibleIterator; -use futures_util::{ready, Stream}; -use log::debug; -use pin_project_lite::pin_project; -use postgres_protocol2::message::backend::Message; -use postgres_protocol2::message::frontend; use std::marker::PhantomPinned; use std::pin::Pin; use std::sync::Arc; use std::task::{Context, Poll}; +use bytes::Bytes; +use fallible_iterator::FallibleIterator; +use futures_util::{Stream, ready}; +use log::debug; +use pin_project_lite::pin_project; +use postgres_protocol2::message::backend::Message; +use postgres_protocol2::message::frontend; + +use crate::client::{InnerClient, Responses}; +use crate::codec::FrontendMessage; +use crate::connection::RequestMessages; +use crate::{Error, ReadyForQueryStatus, SimpleQueryMessage, SimpleQueryRow}; + /// Information about a column of a single query row. #[derive(Debug)] pub struct SimpleColumn { diff --git a/libs/proxy/tokio-postgres2/src/statement.rs b/libs/proxy/tokio-postgres2/src/statement.rs index 591872fbc5..e4828db712 100644 --- a/libs/proxy/tokio-postgres2/src/statement.rs +++ b/libs/proxy/tokio-postgres2/src/statement.rs @@ -1,15 +1,14 @@ +use std::fmt; +use std::sync::{Arc, Weak}; + +use postgres_protocol2::Oid; +use postgres_protocol2::message::backend::Field; +use postgres_protocol2::message::frontend; + use crate::client::InnerClient; use crate::codec::FrontendMessage; use crate::connection::RequestMessages; use crate::types::Type; -use postgres_protocol2::{ - message::{backend::Field, frontend}, - Oid, -}; -use std::{ - fmt, - sync::{Arc, Weak}, -}; struct StatementInner { client: Weak, diff --git a/libs/proxy/tokio-postgres2/src/tls.rs b/libs/proxy/tokio-postgres2/src/tls.rs index dc8140719f..41b51368ff 100644 --- a/libs/proxy/tokio-postgres2/src/tls.rs +++ b/libs/proxy/tokio-postgres2/src/tls.rs @@ -5,6 +5,7 @@ use std::future::Future; use std::pin::Pin; use std::task::{Context, Poll}; use std::{fmt, io}; + use tokio::io::{AsyncRead, AsyncWrite, ReadBuf}; pub(crate) mod private { diff --git a/libs/proxy/tokio-postgres2/src/transaction.rs b/libs/proxy/tokio-postgres2/src/transaction.rs index 03a57e4947..eecbfc5873 100644 --- a/libs/proxy/tokio-postgres2/src/transaction.rs +++ b/libs/proxy/tokio-postgres2/src/transaction.rs @@ -1,8 +1,9 @@ +use postgres_protocol2::message::frontend; + use crate::codec::FrontendMessage; use crate::connection::RequestMessages; use crate::query::RowStream; use crate::{CancelToken, Client, Error, ReadyForQueryStatus}; -use postgres_protocol2::message::frontend; /// A representation of a PostgreSQL database transaction. /// diff --git a/proxy/Cargo.toml b/proxy/Cargo.toml index 6a381bf094..5964b76ecf 100644 --- a/proxy/Cargo.toml +++ b/proxy/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "proxy" version = "0.1.0" -edition.workspace = true +edition = "2024" license.workspace = true [features] diff --git a/proxy/src/auth/backend/console_redirect.rs b/proxy/src/auth/backend/console_redirect.rs index 7503b4eac9..dd48384c03 100644 --- a/proxy/src/auth/backend/console_redirect.rs +++ b/proxy/src/auth/backend/console_redirect.rs @@ -8,16 +8,16 @@ use tokio::io::{AsyncRead, AsyncWrite}; use tracing::{info, info_span}; use super::ComputeCredentialKeys; -use crate::auth::backend::ComputeUserInfo; use crate::auth::IpPattern; +use crate::auth::backend::ComputeUserInfo; use crate::cache::Cached; use crate::config::AuthenticationConfig; use crate::context::RequestContext; use crate::control_plane::client::cplane_proxy_v1; use crate::control_plane::{self, CachedNodeInfo, NodeInfo}; use crate::error::{ReportableError, UserFacingError}; -use crate::proxy::connect_compute::ComputeConnectBackend; use crate::proxy::NeonOptions; +use crate::proxy::connect_compute::ComputeConnectBackend; use crate::stream::PqStream; use crate::types::RoleName; use crate::{auth, compute, waiters}; diff --git a/proxy/src/auth/backend/jwt.rs b/proxy/src/auth/backend/jwt.rs index 5d032c0deb..942f1e13d1 100644 --- a/proxy/src/auth/backend/jwt.rs +++ b/proxy/src/auth/backend/jwt.rs @@ -6,9 +6,9 @@ use std::time::{Duration, SystemTime}; use arc_swap::ArcSwapOption; use clashmap::ClashMap; use jose_jwk::crypto::KeyInfo; -use reqwest::{redirect, Client}; -use reqwest_retry::policies::ExponentialBackoff; +use reqwest::{Client, redirect}; use reqwest_retry::RetryTransientMiddleware; +use reqwest_retry::policies::ExponentialBackoff; use serde::de::Visitor; use serde::{Deserialize, Deserializer}; use serde_json::value::RawValue; @@ -498,8 +498,8 @@ fn verify_rsa_signature( alg: &jose_jwa::Algorithm, ) -> Result<(), JwtError> { use jose_jwa::{Algorithm, Signing}; - use rsa::pkcs1v15::{Signature, VerifyingKey}; use rsa::RsaPublicKey; + use rsa::pkcs1v15::{Signature, VerifyingKey}; let key = RsaPublicKey::try_from(key).map_err(JwtError::InvalidRsaKey)?; diff --git a/proxy/src/auth/backend/local.rs b/proxy/src/auth/backend/local.rs index d10f0e82b2..9c3a3772cd 100644 --- a/proxy/src/auth/backend/local.rs +++ b/proxy/src/auth/backend/local.rs @@ -8,8 +8,8 @@ use crate::auth::backend::jwt::FetchAuthRulesError; use crate::compute::ConnCfg; use crate::compute_ctl::ComputeCtlApi; use crate::context::RequestContext; -use crate::control_plane::messages::{ColdStartInfo, EndpointJwksResponse, MetricsAuxInfo}; use crate::control_plane::NodeInfo; +use crate::control_plane::messages::{ColdStartInfo, EndpointJwksResponse, MetricsAuxInfo}; use crate::http; use crate::intern::{BranchIdTag, EndpointIdTag, InternId, ProjectIdTag}; use crate::types::EndpointId; diff --git a/proxy/src/auth/backend/mod.rs b/proxy/src/auth/backend/mod.rs index 8f1625278f..83feed5094 100644 --- a/proxy/src/auth/backend/mod.rs +++ b/proxy/src/auth/backend/mod.rs @@ -18,7 +18,7 @@ use tracing::{debug, info, warn}; use crate::auth::credentials::check_peer_addr_is_in_list; use crate::auth::{ - self, validate_password_and_exchange, AuthError, ComputeUserInfoMaybeEndpoint, IpPattern, + self, AuthError, ComputeUserInfoMaybeEndpoint, IpPattern, validate_password_and_exchange, }; use crate::cache::Cached; use crate::config::AuthenticationConfig; @@ -32,8 +32,8 @@ use crate::control_plane::{ use crate::intern::EndpointIdInt; use crate::metrics::Metrics; use crate::protocol2::ConnectionInfoExtra; -use crate::proxy::connect_compute::ComputeConnectBackend; use crate::proxy::NeonOptions; +use crate::proxy::connect_compute::ComputeConnectBackend; use crate::rate_limiter::{BucketRateLimiter, EndpointRateLimiter}; use crate::stream::Stream; use crate::types::{EndpointCacheKey, EndpointId, RoleName}; @@ -542,7 +542,7 @@ mod tests { use tokio::io::{AsyncRead, AsyncReadExt, AsyncWriteExt}; use super::jwt::JwkCache; - use super::{auth_quirks, AuthRateLimiter}; + use super::{AuthRateLimiter, auth_quirks}; use crate::auth::backend::MaskedIp; use crate::auth::{ComputeUserInfoMaybeEndpoint, IpPattern}; use crate::config::AuthenticationConfig; @@ -553,8 +553,8 @@ mod tests { }; use crate::proxy::NeonOptions; use crate::rate_limiter::{EndpointRateLimiter, RateBucketInfo}; - use crate::scram::threadpool::ThreadPool; use crate::scram::ServerSecret; + use crate::scram::threadpool::ThreadPool; use crate::stream::{PqStream, Stream}; struct Auth { diff --git a/proxy/src/auth/credentials.rs b/proxy/src/auth/credentials.rs index eff49a402a..c1b7718e4f 100644 --- a/proxy/src/auth/credentials.rs +++ b/proxy/src/auth/credentials.rs @@ -197,7 +197,10 @@ impl<'de> serde::de::Deserialize<'de> for IpPattern { type Value = IpPattern; fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(formatter, "comma separated list with ip address, ip address range, or ip address subnet mask") + write!( + formatter, + "comma separated list with ip address, ip address range, or ip address subnet mask" + ) } fn visit_str(self, v: &str) -> Result @@ -252,8 +255,8 @@ fn project_name_valid(name: &str) -> bool { #[cfg(test)] #[expect(clippy::unwrap_used)] mod tests { - use serde_json::json; use ComputeUserInfoParseError::*; + use serde_json::json; use super::*; diff --git a/proxy/src/auth/mod.rs b/proxy/src/auth/mod.rs index 6082695a6b..5670f8e43d 100644 --- a/proxy/src/auth/mod.rs +++ b/proxy/src/auth/mod.rs @@ -5,13 +5,13 @@ pub use backend::Backend; mod credentials; pub(crate) use credentials::{ - check_peer_addr_is_in_list, endpoint_sni, ComputeUserInfoMaybeEndpoint, - ComputeUserInfoParseError, IpPattern, + ComputeUserInfoMaybeEndpoint, ComputeUserInfoParseError, IpPattern, check_peer_addr_is_in_list, + endpoint_sni, }; mod password_hack; -pub(crate) use password_hack::parse_endpoint_param; use password_hack::PasswordHackPayload; +pub(crate) use password_hack::parse_endpoint_param; mod flow; use std::io; diff --git a/proxy/src/binary/local_proxy.rs b/proxy/src/binary/local_proxy.rs index 4ab11f828c..dedd225cba 100644 --- a/proxy/src/binary/local_proxy.rs +++ b/proxy/src/binary/local_proxy.rs @@ -4,7 +4,7 @@ use std::str::FromStr; use std::sync::Arc; use std::time::Duration; -use anyhow::{bail, ensure, Context}; +use anyhow::{Context, bail, ensure}; use camino::{Utf8Path, Utf8PathBuf}; use clap::Parser; use compute_api::spec::LocalProxySpec; @@ -19,7 +19,7 @@ use utils::sentry_init::init_sentry; use utils::{pid_file, project_build_tag, project_git_version}; use crate::auth::backend::jwt::JwkCache; -use crate::auth::backend::local::{LocalBackend, JWKS_ROLE_MAP}; +use crate::auth::backend::local::{JWKS_ROLE_MAP, LocalBackend}; use crate::auth::{self}; use crate::cancellation::CancellationHandler; use crate::config::{ diff --git a/proxy/src/binary/pg_sni_router.rs b/proxy/src/binary/pg_sni_router.rs index 94e771a61c..1aa290399c 100644 --- a/proxy/src/binary/pg_sni_router.rs +++ b/proxy/src/binary/pg_sni_router.rs @@ -5,24 +5,24 @@ /// the outside. Similar to an ingress controller for HTTPS. use std::{net::SocketAddr, sync::Arc}; -use anyhow::{anyhow, bail, ensure, Context}; +use anyhow::{Context, anyhow, bail, ensure}; use clap::Arg; -use futures::future::Either; use futures::TryFutureExt; +use futures::future::Either; use itertools::Itertools; use rustls::crypto::ring; use rustls::pki_types::PrivateKeyDer; use tokio::io::{AsyncRead, AsyncWrite}; use tokio::net::TcpListener; use tokio_util::sync::CancellationToken; -use tracing::{error, info, Instrument}; +use tracing::{Instrument, error, info}; use utils::project_git_version; use utils::sentry_init::init_sentry; use crate::context::RequestContext; use crate::metrics::{Metrics, ThreadPoolMetrics}; use crate::protocol2::ConnectionInfo; -use crate::proxy::{copy_bidirectional_client_compute, run_until_cancelled, ErrorSource}; +use crate::proxy::{ErrorSource, copy_bidirectional_client_compute, run_until_cancelled}; use crate::stream::{PqStream, Stream}; use crate::tls::TlsServerEndPoint; diff --git a/proxy/src/binary/proxy.rs b/proxy/src/binary/proxy.rs index b72799df54..eec0bf8f99 100644 --- a/proxy/src/binary/proxy.rs +++ b/proxy/src/binary/proxy.rs @@ -9,16 +9,16 @@ use remote_storage::RemoteStorageConfig; use tokio::net::TcpListener; use tokio::task::JoinSet; use tokio_util::sync::CancellationToken; -use tracing::{info, warn, Instrument}; +use tracing::{Instrument, info, warn}; use utils::sentry_init::init_sentry; use utils::{project_build_tag, project_git_version}; use crate::auth::backend::jwt::JwkCache; use crate::auth::backend::{AuthRateLimiter, ConsoleRedirectBackend, MaybeOwned}; -use crate::cancellation::{handle_cancel_messages, CancellationHandler}; +use crate::cancellation::{CancellationHandler, handle_cancel_messages}; use crate::config::{ - self, remote_storage_from_toml, AuthenticationConfig, CacheOptions, ComputeConfig, HttpConfig, - ProjectInfoCacheOptions, ProxyConfig, ProxyProtocolV2, + self, AuthenticationConfig, CacheOptions, ComputeConfig, HttpConfig, ProjectInfoCacheOptions, + ProxyConfig, ProxyProtocolV2, remote_storage_from_toml, }; use crate::context::parquet::ParquetUploadArgs; use crate::http::health_server::AppMetrics; @@ -30,8 +30,8 @@ use crate::redis::connection_with_credentials_provider::ConnectionWithCredential use crate::redis::kv_ops::RedisKVClient; use crate::redis::{elasticache, notifications}; use crate::scram::threadpool::ThreadPool; -use crate::serverless::cancel_set::CancelSet; use crate::serverless::GlobalConnPoolOptions; +use crate::serverless::cancel_set::CancelSet; use crate::tls::client_config::compute_client_config_with_root_certs; use crate::{auth, control_plane, http, serverless, usage_metrics}; @@ -331,7 +331,9 @@ pub async fn run() -> anyhow::Result<()> { ), ), (None, None) => { - warn!("irsa auth requires redis-host and redis-port to be set, continuing without regional_redis_client"); + warn!( + "irsa auth requires redis-host and redis-port to be set, continuing without regional_redis_client" + ); None } _ => { diff --git a/proxy/src/cache/project_info.rs b/proxy/src/cache/project_info.rs index 7651eb71a2..e153e9f61f 100644 --- a/proxy/src/cache/project_info.rs +++ b/proxy/src/cache/project_info.rs @@ -1,12 +1,12 @@ use std::collections::HashSet; use std::convert::Infallible; -use std::sync::atomic::AtomicU64; use std::sync::Arc; +use std::sync::atomic::AtomicU64; use std::time::Duration; use async_trait::async_trait; use clashmap::ClashMap; -use rand::{thread_rng, Rng}; +use rand::{Rng, thread_rng}; use smol_str::SmolStr; use tokio::sync::Mutex; use tokio::time::Instant; diff --git a/proxy/src/cache/timed_lru.rs b/proxy/src/cache/timed_lru.rs index 06eaeb9a30..7cfe5100ea 100644 --- a/proxy/src/cache/timed_lru.rs +++ b/proxy/src/cache/timed_lru.rs @@ -11,11 +11,11 @@ use std::time::{Duration, Instant}; // This severely hinders its usage both in terms of creating wrappers and supported key types. // // On the other hand, `hashlink` has good download stats and appears to be maintained. -use hashlink::{linked_hash_map::RawEntryMut, LruCache}; +use hashlink::{LruCache, linked_hash_map::RawEntryMut}; use tracing::debug; use super::common::Cached; -use super::{timed_lru, Cache}; +use super::{Cache, timed_lru}; /// An implementation of timed LRU cache with fixed capacity. /// Key properties: diff --git a/proxy/src/cancellation.rs b/proxy/src/cancellation.rs index 422e6f741d..8263e5aa2a 100644 --- a/proxy/src/cancellation.rs +++ b/proxy/src/cancellation.rs @@ -3,8 +3,8 @@ use std::net::{IpAddr, SocketAddr}; use std::sync::Arc; use ipnet::{IpNet, Ipv4Net, Ipv6Net}; -use postgres_client::tls::MakeTlsConnect; use postgres_client::CancelToken; +use postgres_client::tls::MakeTlsConnect; use pq_proto::CancelKeyData; use serde::{Deserialize, Serialize}; use thiserror::Error; @@ -13,7 +13,7 @@ use tokio::sync::{mpsc, oneshot}; use tracing::{debug, info}; use crate::auth::backend::ComputeUserInfo; -use crate::auth::{check_peer_addr_is_in_list, AuthError}; +use crate::auth::{AuthError, check_peer_addr_is_in_list}; use crate::config::ComputeConfig; use crate::context::RequestContext; use crate::control_plane::ControlPlaneApi; diff --git a/proxy/src/config.rs b/proxy/src/config.rs index 460e0cff54..1bcd22e98f 100644 --- a/proxy/src/config.rs +++ b/proxy/src/config.rs @@ -2,18 +2,18 @@ use std::str::FromStr; use std::sync::Arc; use std::time::Duration; -use anyhow::{bail, ensure, Context, Ok}; +use anyhow::{Context, Ok, bail, ensure}; use clap::ValueEnum; use remote_storage::RemoteStorageConfig; -use crate::auth::backend::jwt::JwkCache; use crate::auth::backend::AuthRateLimiter; +use crate::auth::backend::jwt::JwkCache; use crate::control_plane::locks::ApiLocks; use crate::rate_limiter::{RateBucketInfo, RateLimitAlgorithm, RateLimiterConfig}; use crate::scram::threadpool::ThreadPool; -use crate::serverless::cancel_set::CancelSet; use crate::serverless::GlobalConnPoolOptions; -pub use crate::tls::server_config::{configure_tls, TlsConfig}; +use crate::serverless::cancel_set::CancelSet; +pub use crate::tls::server_config::{TlsConfig, configure_tls}; use crate::types::Host; pub struct ProxyConfig { @@ -97,8 +97,7 @@ pub struct EndpointCacheConfig { impl EndpointCacheConfig { /// Default options for [`crate::control_plane::NodeInfoCache`]. /// Notice that by default the limiter is empty, which means that cache is disabled. - pub const CACHE_DEFAULT_OPTIONS: &'static str = - "initial_batch_size=1000,default_batch_size=10,xread_timeout=5m,stream_name=controlPlane,disable_cache=true,limiter_info=1000@1s,retry_interval=1s"; + pub const CACHE_DEFAULT_OPTIONS: &'static str = "initial_batch_size=1000,default_batch_size=10,xread_timeout=5m,stream_name=controlPlane,disable_cache=true,limiter_info=1000@1s,retry_interval=1s"; /// Parse cache options passed via cmdline. /// Example: [`Self::CACHE_DEFAULT_OPTIONS`]. diff --git a/proxy/src/console_redirect_proxy.rs b/proxy/src/console_redirect_proxy.rs index a2e7299d39..4662860b3f 100644 --- a/proxy/src/console_redirect_proxy.rs +++ b/proxy/src/console_redirect_proxy.rs @@ -3,7 +3,7 @@ use std::sync::Arc; use futures::{FutureExt, TryFutureExt}; use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt}; use tokio_util::sync::CancellationToken; -use tracing::{debug, error, info, Instrument}; +use tracing::{Instrument, debug, error, info}; use crate::auth::backend::ConsoleRedirectBackend; use crate::cancellation::CancellationHandler; @@ -11,12 +11,12 @@ use crate::config::{ProxyConfig, ProxyProtocolV2}; use crate::context::RequestContext; use crate::error::ReportableError; use crate::metrics::{Metrics, NumClientConnectionsGuard}; -use crate::protocol2::{read_proxy_protocol, ConnectHeader, ConnectionInfo}; -use crate::proxy::connect_compute::{connect_to_compute, TcpMechanism}; -use crate::proxy::handshake::{handshake, HandshakeData}; +use crate::protocol2::{ConnectHeader, ConnectionInfo, read_proxy_protocol}; +use crate::proxy::connect_compute::{TcpMechanism, connect_to_compute}; +use crate::proxy::handshake::{HandshakeData, handshake}; use crate::proxy::passthrough::ProxyPassthrough; use crate::proxy::{ - prepare_client_connection, run_until_cancelled, ClientRequestError, ErrorSource, + ClientRequestError, ErrorSource, prepare_client_connection, run_until_cancelled, }; pub async fn task_main( @@ -64,22 +64,34 @@ pub async fn task_main( debug!("healthcheck received"); return; } - Ok((_socket, ConnectHeader::Missing)) if config.proxy_protocol_v2 == ProxyProtocolV2::Required => { + Ok((_socket, ConnectHeader::Missing)) + if config.proxy_protocol_v2 == ProxyProtocolV2::Required => + { error!("missing required proxy protocol header"); return; } - Ok((_socket, ConnectHeader::Proxy(_))) if config.proxy_protocol_v2 == ProxyProtocolV2::Rejected => { + Ok((_socket, ConnectHeader::Proxy(_))) + if config.proxy_protocol_v2 == ProxyProtocolV2::Rejected => + { error!("proxy protocol header not supported"); return; } Ok((socket, ConnectHeader::Proxy(info))) => (socket, info), - Ok((socket, ConnectHeader::Missing)) => (socket, ConnectionInfo{ addr: peer_addr, extra: None }), + Ok((socket, ConnectHeader::Missing)) => ( + socket, + ConnectionInfo { + addr: peer_addr, + extra: None, + }, + ), }; match socket.inner.set_nodelay(true) { Ok(()) => {} Err(e) => { - error!("per-client task finished with an error: failed to set socket option: {e:#}"); + error!( + "per-client task finished with an error: failed to set socket option: {e:#}" + ); return; } } @@ -118,10 +130,16 @@ pub async fn task_main( match p.proxy_pass(&config.connect_to_compute).await { Ok(()) => {} Err(ErrorSource::Client(e)) => { - error!(?session_id, "per-client task finished with an IO error from the client: {e:#}"); + error!( + ?session_id, + "per-client task finished with an IO error from the client: {e:#}" + ); } Err(ErrorSource::Compute(e)) => { - error!(?session_id, "per-client task finished with an IO error from the compute: {e:#}"); + error!( + ?session_id, + "per-client task finished with an IO error from the compute: {e:#}" + ); } } } diff --git a/proxy/src/context/mod.rs b/proxy/src/context/mod.rs index 3236b2e1bf..74b48a1bea 100644 --- a/proxy/src/context/mod.rs +++ b/proxy/src/context/mod.rs @@ -8,7 +8,7 @@ use pq_proto::StartupMessageParams; use smol_str::SmolStr; use tokio::sync::mpsc; use tracing::field::display; -use tracing::{debug, error, info_span, Span}; +use tracing::{Span, debug, error, info_span}; use try_lock::TryLock; use uuid::Uuid; diff --git a/proxy/src/context/parquet.rs b/proxy/src/context/parquet.rs index 0537ae6a62..f029327266 100644 --- a/proxy/src/context/parquet.rs +++ b/proxy/src/context/parquet.rs @@ -8,7 +8,7 @@ use chrono::{Datelike, Timelike}; use futures::{Stream, StreamExt}; use parquet::basic::Compression; use parquet::file::metadata::RowGroupMetaDataPtr; -use parquet::file::properties::{WriterProperties, WriterPropertiesPtr, DEFAULT_PAGE_SIZE}; +use parquet::file::properties::{DEFAULT_PAGE_SIZE, WriterProperties, WriterPropertiesPtr}; use parquet::file::writer::SerializedFileWriter; use parquet::record::RecordWriter; use pq_proto::StartupMessageParams; @@ -17,10 +17,10 @@ use serde::ser::SerializeMap; use tokio::sync::mpsc; use tokio::time; use tokio_util::sync::CancellationToken; -use tracing::{debug, info, Span}; +use tracing::{Span, debug, info}; use utils::backoff; -use super::{RequestContextInner, LOG_CHAN}; +use super::{LOG_CHAN, RequestContextInner}; use crate::config::remote_storage_from_toml; use crate::context::LOG_CHAN_DISCONNECT; use crate::ext::TaskExt; @@ -425,20 +425,20 @@ mod tests { use futures::{Stream, StreamExt}; use itertools::Itertools; use parquet::basic::{Compression, ZstdLevel}; - use parquet::file::properties::{WriterProperties, DEFAULT_PAGE_SIZE}; + use parquet::file::properties::{DEFAULT_PAGE_SIZE, WriterProperties}; use parquet::file::reader::FileReader; use parquet::file::serialized_reader::SerializedFileReader; use rand::rngs::StdRng; use rand::{Rng, SeedableRng}; use remote_storage::{ - GenericRemoteStorage, RemoteStorageConfig, RemoteStorageKind, S3Config, DEFAULT_MAX_KEYS_PER_LIST_RESPONSE, DEFAULT_REMOTE_STORAGE_S3_CONCURRENCY_LIMIT, + GenericRemoteStorage, RemoteStorageConfig, RemoteStorageKind, S3Config, }; use tokio::sync::mpsc; use tokio::time; use walkdir::WalkDir; - use super::{worker_inner, ParquetConfig, ParquetUploadArgs, RequestData}; + use super::{ParquetConfig, ParquetUploadArgs, RequestData, worker_inner}; #[derive(Parser)] struct ProxyCliArgs { @@ -514,26 +514,26 @@ mod tests { fn generate_request_data(rng: &mut impl Rng) -> RequestData { RequestData { - session_id: uuid::Builder::from_random_bytes(rng.gen()).into_uuid(), - peer_addr: Ipv4Addr::from(rng.gen::<[u8; 4]>()).to_string(), + session_id: uuid::Builder::from_random_bytes(rng.r#gen()).into_uuid(), + peer_addr: Ipv4Addr::from(rng.r#gen::<[u8; 4]>()).to_string(), timestamp: chrono::DateTime::from_timestamp_millis( rng.gen_range(1703862754..1803862754), ) .unwrap() .naive_utc(), application_name: Some("test".to_owned()), - username: Some(hex::encode(rng.gen::<[u8; 4]>())), - endpoint_id: Some(hex::encode(rng.gen::<[u8; 16]>())), - database: Some(hex::encode(rng.gen::<[u8; 16]>())), - project: Some(hex::encode(rng.gen::<[u8; 16]>())), - branch: Some(hex::encode(rng.gen::<[u8; 16]>())), + username: Some(hex::encode(rng.r#gen::<[u8; 4]>())), + endpoint_id: Some(hex::encode(rng.r#gen::<[u8; 16]>())), + database: Some(hex::encode(rng.r#gen::<[u8; 16]>())), + project: Some(hex::encode(rng.r#gen::<[u8; 16]>())), + branch: Some(hex::encode(rng.r#gen::<[u8; 16]>())), pg_options: None, auth_method: None, jwt_issuer: None, protocol: ["tcp", "ws", "http"][rng.gen_range(0..3)], region: "us-east-1", error: None, - success: rng.gen(), + success: rng.r#gen(), cold_start_info: "no", duration_us: rng.gen_range(0..30_000_000), disconnect_timestamp: None, diff --git a/proxy/src/control_plane/client/cplane_proxy_v1.rs b/proxy/src/control_plane/client/cplane_proxy_v1.rs index ef6621fc59..977fcf4727 100644 --- a/proxy/src/control_plane/client/cplane_proxy_v1.rs +++ b/proxy/src/control_plane/client/cplane_proxy_v1.rs @@ -3,16 +3,16 @@ use std::sync::Arc; use std::time::Duration; -use ::http::header::AUTHORIZATION; use ::http::HeaderName; +use ::http::header::AUTHORIZATION; use futures::TryFutureExt; use postgres_client::config::SslMode; use tokio::time::Instant; -use tracing::{debug, info, info_span, warn, Instrument}; +use tracing::{Instrument, debug, info, info_span, warn}; use super::super::messages::{ControlPlaneErrorMessage, GetEndpointAccessControl, WakeCompute}; -use crate::auth::backend::jwt::AuthRule; use crate::auth::backend::ComputeUserInfo; +use crate::auth::backend::jwt::AuthRule; use crate::cache::Cached; use crate::context::RequestContext; use crate::control_plane::caches::ApiCaches; diff --git a/proxy/src/control_plane/client/mock.rs b/proxy/src/control_plane/client/mock.rs index 1e6cde8fb0..7da5464aa5 100644 --- a/proxy/src/control_plane/client/mock.rs +++ b/proxy/src/control_plane/client/mock.rs @@ -6,11 +6,11 @@ use std::sync::Arc; use futures::TryFutureExt; use thiserror::Error; use tokio_postgres::Client; -use tracing::{error, info, info_span, warn, Instrument}; +use tracing::{Instrument, error, info, info_span, warn}; -use crate::auth::backend::jwt::AuthRule; -use crate::auth::backend::ComputeUserInfo; use crate::auth::IpPattern; +use crate::auth::backend::ComputeUserInfo; +use crate::auth::backend::jwt::AuthRule; use crate::cache::Cached; use crate::context::RequestContext; use crate::control_plane::client::{ diff --git a/proxy/src/control_plane/client/mod.rs b/proxy/src/control_plane/client/mod.rs index c28ff4789d..746595de38 100644 --- a/proxy/src/control_plane/client/mod.rs +++ b/proxy/src/control_plane/client/mod.rs @@ -10,15 +10,15 @@ use clashmap::ClashMap; use tokio::time::Instant; use tracing::{debug, info}; -use crate::auth::backend::jwt::{AuthRule, FetchAuthRules, FetchAuthRulesError}; use crate::auth::backend::ComputeUserInfo; +use crate::auth::backend::jwt::{AuthRule, FetchAuthRules, FetchAuthRulesError}; use crate::cache::endpoints::EndpointsCache; use crate::cache::project_info::ProjectInfoCacheImpl; use crate::config::{CacheOptions, EndpointCacheConfig, ProjectInfoCacheOptions}; use crate::context::RequestContext; use crate::control_plane::{ - errors, CachedAccessBlockerFlags, CachedAllowedIps, CachedAllowedVpcEndpointIds, - CachedNodeInfo, CachedRoleSecret, ControlPlaneApi, NodeInfoCache, + CachedAccessBlockerFlags, CachedAllowedIps, CachedAllowedVpcEndpointIds, CachedNodeInfo, + CachedRoleSecret, ControlPlaneApi, NodeInfoCache, errors, }; use crate::error::ReportableError; use crate::metrics::ApiLockMetrics; diff --git a/proxy/src/control_plane/errors.rs b/proxy/src/control_plane/errors.rs index d6f565e34a..bc30cffd27 100644 --- a/proxy/src/control_plane/errors.rs +++ b/proxy/src/control_plane/errors.rs @@ -2,7 +2,7 @@ use thiserror::Error; use crate::control_plane::client::ApiLockError; use crate::control_plane::messages::{self, ControlPlaneErrorMessage, Reason}; -use crate::error::{io_error, ErrorKind, ReportableError, UserFacingError}; +use crate::error::{ErrorKind, ReportableError, UserFacingError, io_error}; use crate::proxy::retry::CouldRetry; /// A go-to error message which doesn't leak any detail. diff --git a/proxy/src/control_plane/mgmt.rs b/proxy/src/control_plane/mgmt.rs index 2f7359240d..df31abcc8c 100644 --- a/proxy/src/control_plane/mgmt.rs +++ b/proxy/src/control_plane/mgmt.rs @@ -6,7 +6,7 @@ use postgres_backend::{AuthType, PostgresBackend, PostgresBackendTCP, QueryError use pq_proto::{BeMessage, SINGLE_COL_ROWDESC}; use tokio::net::{TcpListener, TcpStream}; use tokio_util::sync::CancellationToken; -use tracing::{error, info, info_span, Instrument}; +use tracing::{Instrument, error, info, info_span}; use crate::control_plane::messages::{DatabaseInfo, KickSession}; use crate::waiters::{self, Waiter, Waiters}; diff --git a/proxy/src/control_plane/mod.rs b/proxy/src/control_plane/mod.rs index 89ec4f9b33..d592223be1 100644 --- a/proxy/src/control_plane/mod.rs +++ b/proxy/src/control_plane/mod.rs @@ -11,9 +11,9 @@ pub(crate) mod errors; use std::sync::Arc; +use crate::auth::IpPattern; use crate::auth::backend::jwt::AuthRule; use crate::auth::backend::{ComputeCredentialKeys, ComputeUserInfo}; -use crate::auth::IpPattern; use crate::cache::project_info::ProjectInfoCacheImpl; use crate::cache::{Cached, TimedLru}; use crate::config::ComputeConfig; diff --git a/proxy/src/http/health_server.rs b/proxy/src/http/health_server.rs index 141f319567..5278fe2a3e 100644 --- a/proxy/src/http/health_server.rs +++ b/proxy/src/http/health_server.rs @@ -9,8 +9,8 @@ use http_utils::json::json_response; use http_utils::{RouterBuilder, RouterService}; use hyper0::header::CONTENT_TYPE; use hyper0::{Body, Request, Response, StatusCode}; -use measured::text::BufferedTextEncoder; use measured::MetricGroup; +use measured::text::BufferedTextEncoder; use metrics::NeonMetrics; use tracing::{info, info_span}; diff --git a/proxy/src/http/mod.rs b/proxy/src/http/mod.rs index ed88c77256..96f600d836 100644 --- a/proxy/src/http/mod.rs +++ b/proxy/src/http/mod.rs @@ -13,8 +13,8 @@ use hyper::body::Body; pub(crate) use reqwest::{Request, Response}; use reqwest_middleware::RequestBuilder; pub(crate) use reqwest_middleware::{ClientWithMiddleware, Error}; -pub(crate) use reqwest_retry::policies::ExponentialBackoff; pub(crate) use reqwest_retry::RetryTransientMiddleware; +pub(crate) use reqwest_retry::policies::ExponentialBackoff; use thiserror::Error; use crate::metrics::{ConsoleRequest, Metrics}; diff --git a/proxy/src/logging.rs b/proxy/src/logging.rs index fbd4811b54..3c34918d84 100644 --- a/proxy/src/logging.rs +++ b/proxy/src/logging.rs @@ -8,7 +8,7 @@ use opentelemetry::trace::TraceContextExt; use scopeguard::defer; use serde::ser::{SerializeMap, Serializer}; use tracing::subscriber::Interest; -use tracing::{callsite, span, Event, Metadata, Span, Subscriber}; +use tracing::{Event, Metadata, Span, Subscriber, callsite, span}; use tracing_opentelemetry::OpenTelemetrySpanExt; use tracing_subscriber::filter::{EnvFilter, LevelFilter}; use tracing_subscriber::fmt::format::{Format, Full}; diff --git a/proxy/src/metrics.rs b/proxy/src/metrics.rs index f3447e063e..db1f096de1 100644 --- a/proxy/src/metrics.rs +++ b/proxy/src/metrics.rs @@ -543,11 +543,7 @@ impl Drop for LatencyTimer { impl From for Bool { fn from(value: bool) -> Self { - if value { - Bool::True - } else { - Bool::False - } + if value { Bool::True } else { Bool::False } } } diff --git a/proxy/src/protocol2.rs b/proxy/src/protocol2.rs index 99d645878f..41180fa6c1 100644 --- a/proxy/src/protocol2.rs +++ b/proxy/src/protocol2.rs @@ -407,7 +407,7 @@ mod tests { use tokio::io::AsyncReadExt; use crate::protocol2::{ - read_proxy_protocol, ConnectHeader, LOCAL_V2, PROXY_V2, TCP_OVER_IPV4, UDP_OVER_IPV6, + ConnectHeader, LOCAL_V2, PROXY_V2, TCP_OVER_IPV4, UDP_OVER_IPV6, read_proxy_protocol, }; #[tokio::test] diff --git a/proxy/src/proxy/connect_compute.rs b/proxy/src/proxy/connect_compute.rs index 26fb1754bf..b8b39fa121 100644 --- a/proxy/src/proxy/connect_compute.rs +++ b/proxy/src/proxy/connect_compute.rs @@ -5,7 +5,7 @@ use tracing::{debug, info, warn}; use super::retry::ShouldRetryWakeCompute; use crate::auth::backend::{ComputeCredentialKeys, ComputeUserInfo}; -use crate::compute::{self, PostgresConnection, COULD_NOT_CONNECT}; +use crate::compute::{self, COULD_NOT_CONNECT, PostgresConnection}; use crate::config::{ComputeConfig, RetryConfig}; use crate::context::RequestContext; use crate::control_plane::errors::WakeComputeError; @@ -15,7 +15,7 @@ use crate::error::ReportableError; use crate::metrics::{ ConnectOutcome, ConnectionFailureKind, Metrics, RetriesMetricGroup, RetryType, }; -use crate::proxy::retry::{retry_after, should_retry, CouldRetry}; +use crate::proxy::retry::{CouldRetry, retry_after, should_retry}; use crate::proxy::wake_compute::wake_compute; use crate::types::Host; diff --git a/proxy/src/proxy/copy_bidirectional.rs b/proxy/src/proxy/copy_bidirectional.rs index 861f1766e8..6f8b972348 100644 --- a/proxy/src/proxy/copy_bidirectional.rs +++ b/proxy/src/proxy/copy_bidirectional.rs @@ -1,7 +1,7 @@ use std::future::poll_fn; use std::io; use std::pin::Pin; -use std::task::{ready, Context, Poll}; +use std::task::{Context, Poll, ready}; use tokio::io::{AsyncRead, AsyncWrite, ReadBuf}; use tracing::info; diff --git a/proxy/src/proxy/mod.rs b/proxy/src/proxy/mod.rs index 49566e5172..0c6d352600 100644 --- a/proxy/src/proxy/mod.rs +++ b/proxy/src/proxy/mod.rs @@ -9,28 +9,28 @@ pub(crate) mod retry; pub(crate) mod wake_compute; use std::sync::Arc; -pub use copy_bidirectional::{copy_bidirectional_client_compute, ErrorSource}; +pub use copy_bidirectional::{ErrorSource, copy_bidirectional_client_compute}; use futures::{FutureExt, TryFutureExt}; use itertools::Itertools; use once_cell::sync::OnceCell; use pq_proto::{BeMessage as Be, CancelKeyData, StartupMessageParams}; use regex::Regex; use serde::{Deserialize, Serialize}; -use smol_str::{format_smolstr, SmolStr, ToSmolStr}; +use smol_str::{SmolStr, ToSmolStr, format_smolstr}; use thiserror::Error; use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt}; use tokio_util::sync::CancellationToken; -use tracing::{debug, error, info, warn, Instrument}; +use tracing::{Instrument, debug, error, info, warn}; -use self::connect_compute::{connect_to_compute, TcpMechanism}; +use self::connect_compute::{TcpMechanism, connect_to_compute}; use self::passthrough::ProxyPassthrough; use crate::cancellation::{self, CancellationHandler}; use crate::config::{ProxyConfig, ProxyProtocolV2, TlsConfig}; use crate::context::RequestContext; use crate::error::ReportableError; use crate::metrics::{Metrics, NumClientConnectionsGuard}; -use crate::protocol2::{read_proxy_protocol, ConnectHeader, ConnectionInfo, ConnectionInfoExtra}; -use crate::proxy::handshake::{handshake, HandshakeData}; +use crate::protocol2::{ConnectHeader, ConnectionInfo, ConnectionInfoExtra, read_proxy_protocol}; +use crate::proxy::handshake::{HandshakeData, handshake}; use crate::rate_limiter::EndpointRateLimiter; use crate::stream::{PqStream, Stream}; use crate::types::EndpointCacheKey; diff --git a/proxy/src/proxy/tests/mod.rs b/proxy/src/proxy/tests/mod.rs index d8c00a9b41..171f539b1e 100644 --- a/proxy/src/proxy/tests/mod.rs +++ b/proxy/src/proxy/tests/mod.rs @@ -5,12 +5,12 @@ mod mitm; use std::time::Duration; -use anyhow::{bail, Context}; +use anyhow::{Context, bail}; use async_trait::async_trait; use http::StatusCode; use postgres_client::config::SslMode; use postgres_client::tls::{MakeTlsConnect, NoTls}; -use retry::{retry_after, ShouldRetryWakeCompute}; +use retry::{ShouldRetryWakeCompute, retry_after}; use rstest::rstest; use rustls::crypto::ring; use rustls::pki_types; @@ -334,8 +334,8 @@ async fn scram_auth_mock() -> anyhow::Result<()> { generate_tls_config("generic-project-name.localhost", "localhost")?; let proxy = tokio::spawn(dummy_proxy(client, Some(server_config), Scram::mock())); - use rand::distributions::Alphanumeric; use rand::Rng; + use rand::distributions::Alphanumeric; let password: String = rand::thread_rng() .sample_iter(&Alphanumeric) .take(rand::random::() as usize) diff --git a/proxy/src/proxy/wake_compute.rs b/proxy/src/proxy/wake_compute.rs index 4e9206feff..9d8915e24a 100644 --- a/proxy/src/proxy/wake_compute.rs +++ b/proxy/src/proxy/wake_compute.rs @@ -3,8 +3,8 @@ use tracing::{error, info}; use super::connect_compute::ComputeConnectBackend; use crate::config::RetryConfig; use crate::context::RequestContext; -use crate::control_plane::errors::{ControlPlaneError, WakeComputeError}; use crate::control_plane::CachedNodeInfo; +use crate::control_plane::errors::{ControlPlaneError, WakeComputeError}; use crate::error::ReportableError; use crate::metrics::{ ConnectOutcome, ConnectionFailuresBreakdownGroup, Metrics, RetriesMetricGroup, RetryType, diff --git a/proxy/src/rate_limiter/leaky_bucket.rs b/proxy/src/rate_limiter/leaky_bucket.rs index 9645eaf725..b3853d48e4 100644 --- a/proxy/src/rate_limiter/leaky_bucket.rs +++ b/proxy/src/rate_limiter/leaky_bucket.rs @@ -3,7 +3,7 @@ use std::sync::atomic::{AtomicUsize, Ordering}; use ahash::RandomState; use clashmap::ClashMap; -use rand::{thread_rng, Rng}; +use rand::{Rng, thread_rng}; use tokio::time::Instant; use tracing::info; use utils::leaky_bucket::LeakyBucketState; diff --git a/proxy/src/rate_limiter/limit_algorithm.rs b/proxy/src/rate_limiter/limit_algorithm.rs index b74a9ab17e..f8eeb89f05 100644 --- a/proxy/src/rate_limiter/limit_algorithm.rs +++ b/proxy/src/rate_limiter/limit_algorithm.rs @@ -5,8 +5,8 @@ use std::time::Duration; use parking_lot::Mutex; use tokio::sync::Notify; -use tokio::time::error::Elapsed; use tokio::time::Instant; +use tokio::time::error::Elapsed; use self::aimd::Aimd; diff --git a/proxy/src/rate_limiter/limiter.rs b/proxy/src/rate_limiter/limiter.rs index ef6c39f230..71e2a92da6 100644 --- a/proxy/src/rate_limiter/limiter.rs +++ b/proxy/src/rate_limiter/limiter.rs @@ -1,8 +1,8 @@ use std::borrow::Cow; use std::collections::hash_map::RandomState; use std::hash::{BuildHasher, Hash}; -use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Mutex; +use std::sync::atomic::{AtomicUsize, Ordering}; use anyhow::bail; use clashmap::ClashMap; diff --git a/proxy/src/redis/elasticache.rs b/proxy/src/redis/elasticache.rs index bf6dde9332..58e3c889a7 100644 --- a/proxy/src/redis/elasticache.rs +++ b/proxy/src/redis/elasticache.rs @@ -1,6 +1,7 @@ use std::sync::Arc; use std::time::{Duration, SystemTime}; +use aws_config::Region; use aws_config::environment::EnvironmentVariableCredentialsProvider; use aws_config::imds::credentials::ImdsCredentialsProvider; use aws_config::meta::credentials::CredentialsProviderChain; @@ -8,7 +9,6 @@ use aws_config::meta::region::RegionProviderChain; use aws_config::profile::ProfileFileCredentialsProvider; use aws_config::provider_config::ProviderConfig; use aws_config::web_identity_token::WebIdentityTokenCredentialsProvider; -use aws_config::Region; use aws_sdk_iam::config::ProvideCredentials; use aws_sigv4::http_request::{ self, SignableBody, SignableRequest, SignatureLocation, SigningSettings, diff --git a/proxy/src/redis/keys.rs b/proxy/src/redis/keys.rs index dcb9a59f87..7527bca6d0 100644 --- a/proxy/src/redis/keys.rs +++ b/proxy/src/redis/keys.rs @@ -1,7 +1,7 @@ use std::io::ErrorKind; use anyhow::Ok; -use pq_proto::{id_to_cancel_key, CancelKeyData}; +use pq_proto::{CancelKeyData, id_to_cancel_key}; use serde::{Deserialize, Serialize}; pub mod keyspace { diff --git a/proxy/src/sasl/stream.rs b/proxy/src/sasl/stream.rs index ac77556566..46e6a439e5 100644 --- a/proxy/src/sasl/stream.rs +++ b/proxy/src/sasl/stream.rs @@ -5,8 +5,8 @@ use std::io; use tokio::io::{AsyncRead, AsyncWrite}; use tracing::info; -use super::messages::ServerMessage; use super::Mechanism; +use super::messages::ServerMessage; use crate::stream::PqStream; /// Abstracts away all peculiarities of the libpq's protocol. diff --git a/proxy/src/scram/countmin.rs b/proxy/src/scram/countmin.rs index 87ab6e0d5f..9d56c465ec 100644 --- a/proxy/src/scram/countmin.rs +++ b/proxy/src/scram/countmin.rs @@ -90,7 +90,7 @@ mod tests { // number of insert operations let m = rng.gen_range(1..100); - let id = uuid::Builder::from_random_bytes(rng.gen()).into_uuid(); + let id = uuid::Builder::from_random_bytes(rng.r#gen()).into_uuid(); ids.push((id, n, m)); // N = sum(actual) diff --git a/proxy/src/scram/exchange.rs b/proxy/src/scram/exchange.rs index 77853db3db..abd5aeae5b 100644 --- a/proxy/src/scram/exchange.rs +++ b/proxy/src/scram/exchange.rs @@ -5,6 +5,7 @@ use std::convert::Infallible; use hmac::{Hmac, Mac}; use sha2::Sha256; +use super::ScramKey; use super::messages::{ ClientFinalMessage, ClientFirstMessage, OwnedServerFirstMessage, SCRAM_RAW_NONCE_LEN, }; @@ -12,7 +13,6 @@ use super::pbkdf2::Pbkdf2; use super::secret::ServerSecret; use super::signature::SignatureBuilder; use super::threadpool::ThreadPool; -use super::ScramKey; use crate::intern::EndpointIdInt; use crate::sasl::{self, ChannelBinding, Error as SaslError}; @@ -208,8 +208,8 @@ impl sasl::Mechanism for Exchange<'_> { type Output = super::ScramKey; fn exchange(mut self, input: &str) -> sasl::Result> { - use sasl::Step; use ExchangeState; + use sasl::Step; match &self.state { ExchangeState::Initial(init) => { match init.transition(self.secret, &self.tls_server_end_point, input)? { diff --git a/proxy/src/scram/messages.rs b/proxy/src/scram/messages.rs index 0e54e7ded9..7b0b861ce9 100644 --- a/proxy/src/scram/messages.rs +++ b/proxy/src/scram/messages.rs @@ -4,7 +4,7 @@ use std::fmt; use std::ops::Range; use super::base64_decode_array; -use super::key::{ScramKey, SCRAM_KEY_LEN}; +use super::key::{SCRAM_KEY_LEN, ScramKey}; use super::signature::SignatureBuilder; use crate::sasl::ChannelBinding; diff --git a/proxy/src/scram/mod.rs b/proxy/src/scram/mod.rs index cfa571cbe1..24f991d4d9 100644 --- a/proxy/src/scram/mod.rs +++ b/proxy/src/scram/mod.rs @@ -15,7 +15,7 @@ mod secret; mod signature; pub mod threadpool; -pub(crate) use exchange::{exchange, Exchange}; +pub(crate) use exchange::{Exchange, exchange}; use hmac::{Hmac, Mac}; pub(crate) use key::ScramKey; pub(crate) use secret::ServerSecret; diff --git a/proxy/src/scram/signature.rs b/proxy/src/scram/signature.rs index d3255cf2ca..a5b1c3e9f4 100644 --- a/proxy/src/scram/signature.rs +++ b/proxy/src/scram/signature.rs @@ -1,6 +1,6 @@ //! Tools for client/server signature management. -use super::key::{ScramKey, SCRAM_KEY_LEN}; +use super::key::{SCRAM_KEY_LEN, ScramKey}; /// A collection of message parts needed to derive the client's signature. #[derive(Debug)] diff --git a/proxy/src/serverless/backend.rs b/proxy/src/serverless/backend.rs index 70dd7bc0e7..72029102e0 100644 --- a/proxy/src/serverless/backend.rs +++ b/proxy/src/serverless/backend.rs @@ -7,27 +7,27 @@ use ed25519_dalek::SigningKey; use hyper_util::rt::{TokioExecutor, TokioIo, TokioTimer}; use jose_jwk::jose_b64; use rand::rngs::OsRng; -use tokio::net::{lookup_host, TcpStream}; +use tokio::net::{TcpStream, lookup_host}; use tracing::field::display; use tracing::{debug, info}; use super::conn_pool::poll_client; use super::conn_pool_lib::{Client, ConnInfo, EndpointConnPool, GlobalConnPool}; -use super::http_conn_pool::{self, poll_http2_client, HttpConnPool, Send}; -use super::local_conn_pool::{self, LocalConnPool, EXT_NAME, EXT_SCHEMA, EXT_VERSION}; +use super::http_conn_pool::{self, HttpConnPool, Send, poll_http2_client}; +use super::local_conn_pool::{self, EXT_NAME, EXT_SCHEMA, EXT_VERSION, LocalConnPool}; use crate::auth::backend::local::StaticAuthRules; use crate::auth::backend::{ComputeCredentials, ComputeUserInfo}; -use crate::auth::{self, check_peer_addr_is_in_list, AuthError}; +use crate::auth::{self, AuthError, check_peer_addr_is_in_list}; use crate::compute; use crate::compute_ctl::{ ComputeCtlError, ExtensionInstallRequest, Privilege, SetRoleGrantsRequest, }; use crate::config::{ComputeConfig, ProxyConfig}; use crate::context::RequestContext; +use crate::control_plane::CachedNodeInfo; use crate::control_plane::client::ApiLockError; use crate::control_plane::errors::{GetAuthInfoError, WakeComputeError}; use crate::control_plane::locks::ApiLocks; -use crate::control_plane::CachedNodeInfo; use crate::error::{ErrorKind, ReportableError, UserFacingError}; use crate::intern::EndpointIdInt; use crate::protocol2::ConnectionInfoExtra; diff --git a/proxy/src/serverless/cancel_set.rs b/proxy/src/serverless/cancel_set.rs index 6db986f1f7..ba8945afc5 100644 --- a/proxy/src/serverless/cancel_set.rs +++ b/proxy/src/serverless/cancel_set.rs @@ -6,7 +6,7 @@ use std::time::Duration; use indexmap::IndexMap; use parking_lot::Mutex; -use rand::{thread_rng, Rng}; +use rand::{Rng, thread_rng}; use rustc_hash::FxHasher; use tokio::time::Instant; use tokio_util::sync::CancellationToken; @@ -40,7 +40,7 @@ impl CancelSet { pub(crate) fn take(&self) -> Option { for _ in 0..4 { - if let Some(token) = self.take_raw(thread_rng().gen()) { + if let Some(token) = self.take_raw(thread_rng().r#gen()) { return Some(token); } tracing::trace!("failed to get cancel token"); @@ -68,7 +68,7 @@ impl CancelShard { fn take(&mut self, rng: usize) -> Option { NonZeroUsize::new(self.tokens.len()).and_then(|len| { // 10 second grace period so we don't cancel new connections - if self.tokens.get_index(rng % len)?.1 .0.elapsed() < Duration::from_secs(10) { + if self.tokens.get_index(rng % len)?.1.0.elapsed() < Duration::from_secs(10) { return None; } diff --git a/proxy/src/serverless/conn_pool.rs b/proxy/src/serverless/conn_pool.rs index 447103edce..6a9089fc2a 100644 --- a/proxy/src/serverless/conn_pool.rs +++ b/proxy/src/serverless/conn_pool.rs @@ -1,17 +1,17 @@ use std::fmt; use std::pin::pin; use std::sync::{Arc, Weak}; -use std::task::{ready, Poll}; +use std::task::{Poll, ready}; -use futures::future::poll_fn; use futures::Future; -use postgres_client::tls::NoTlsStream; +use futures::future::poll_fn; use postgres_client::AsyncMessage; +use postgres_client::tls::NoTlsStream; use smallvec::SmallVec; use tokio::net::TcpStream; use tokio::time::Instant; use tokio_util::sync::CancellationToken; -use tracing::{error, info, info_span, warn, Instrument}; +use tracing::{Instrument, error, info, info_span, warn}; #[cfg(test)] use { super::conn_pool_lib::GlobalConnPoolOptions, diff --git a/proxy/src/serverless/conn_pool_lib.rs b/proxy/src/serverless/conn_pool_lib.rs index 9e21491655..933204994b 100644 --- a/proxy/src/serverless/conn_pool_lib.rs +++ b/proxy/src/serverless/conn_pool_lib.rs @@ -10,7 +10,7 @@ use parking_lot::RwLock; use postgres_client::ReadyForQueryStatus; use rand::Rng; use smol_str::ToSmolStr; -use tracing::{debug, info, Span}; +use tracing::{Span, debug, info}; use super::backend::HttpConnError; use super::conn_pool::ClientDataRemote; diff --git a/proxy/src/serverless/http_conn_pool.rs b/proxy/src/serverless/http_conn_pool.rs index fa21f24a1c..338a79b4b3 100644 --- a/proxy/src/serverless/http_conn_pool.rs +++ b/proxy/src/serverless/http_conn_pool.rs @@ -7,7 +7,7 @@ use hyper_util::rt::{TokioExecutor, TokioIo}; use parking_lot::RwLock; use smol_str::ToSmolStr; use tokio::net::TcpStream; -use tracing::{debug, error, info, info_span, Instrument}; +use tracing::{Instrument, debug, error, info, info_span}; use super::backend::HttpConnError; use super::conn_pool_lib::{ diff --git a/proxy/src/serverless/json.rs b/proxy/src/serverless/json.rs index ab012bd020..fbd12ad9cb 100644 --- a/proxy/src/serverless/json.rs +++ b/proxy/src/serverless/json.rs @@ -1,5 +1,5 @@ -use postgres_client::types::{Kind, Type}; use postgres_client::Row; +use postgres_client::types::{Kind, Type}; use serde_json::{Map, Value}; // diff --git a/proxy/src/serverless/local_conn_pool.rs b/proxy/src/serverless/local_conn_pool.rs index 137a2d6377..8426a0810e 100644 --- a/proxy/src/serverless/local_conn_pool.rs +++ b/proxy/src/serverless/local_conn_pool.rs @@ -11,24 +11,24 @@ use std::collections::HashMap; use std::pin::pin; -use std::sync::atomic::AtomicUsize; use std::sync::Arc; -use std::task::{ready, Poll}; +use std::sync::atomic::AtomicUsize; +use std::task::{Poll, ready}; use std::time::Duration; use ed25519_dalek::{Signature, Signer, SigningKey}; -use futures::future::poll_fn; use futures::Future; +use futures::future::poll_fn; use indexmap::IndexMap; use jose_jwk::jose_b64::base64ct::{Base64UrlUnpadded, Encoding}; use parking_lot::RwLock; -use postgres_client::tls::NoTlsStream; use postgres_client::AsyncMessage; +use postgres_client::tls::NoTlsStream; use serde_json::value::RawValue; use tokio::net::TcpStream; use tokio::time::Instant; use tokio_util::sync::CancellationToken; -use tracing::{debug, error, info, info_span, warn, Instrument}; +use tracing::{Instrument, debug, error, info, info_span, warn}; use super::backend::HttpConnError; use super::conn_pool_lib::{ @@ -389,6 +389,9 @@ mod tests { // }); // println!("{}", serde_json::to_string(&jwk).unwrap()); - assert_eq!(jwt, "eyJhbGciOiJFZERTQSJ9.eyJmb28iOiJiYXIiLCJqdGkiOjIsIm5lc3RlZCI6eyJqdGkiOiJ0cmlja3kgbmVzdGluZyJ9fQ.Cvyc2By33KI0f0obystwdy8PN111L3Sc9_Mr2CU3XshtSqSdxuRxNEZGbb_RvyJf2IzheC_s7aBZ-jLeQ9N0Bg"); + assert_eq!( + jwt, + "eyJhbGciOiJFZERTQSJ9.eyJmb28iOiJiYXIiLCJqdGkiOjIsIm5lc3RlZCI6eyJqdGkiOiJ0cmlja3kgbmVzdGluZyJ9fQ.Cvyc2By33KI0f0obystwdy8PN111L3Sc9_Mr2CU3XshtSqSdxuRxNEZGbb_RvyJf2IzheC_s7aBZ-jLeQ9N0Bg" + ); } } diff --git a/proxy/src/serverless/mod.rs b/proxy/src/serverless/mod.rs index 8289500159..dd0fb9c5b4 100644 --- a/proxy/src/serverless/mod.rs +++ b/proxy/src/serverless/mod.rs @@ -15,7 +15,7 @@ mod sql_over_http; mod websocket; use std::net::{IpAddr, SocketAddr}; -use std::pin::{pin, Pin}; +use std::pin::{Pin, pin}; use std::sync::Arc; use anyhow::Context; @@ -23,8 +23,8 @@ use async_trait::async_trait; use atomic_take::AtomicTake; use bytes::Bytes; pub use conn_pool_lib::GlobalConnPoolOptions; -use futures::future::{select, Either}; use futures::TryFutureExt; +use futures::future::{Either, select}; use http::{Method, Response, StatusCode}; use http_body_util::combinators::BoxBody; use http_body_util::{BodyExt, Empty}; @@ -32,23 +32,23 @@ use http_utils::error::ApiError; use hyper::body::Incoming; use hyper_util::rt::TokioExecutor; use hyper_util::server::conn::auto::Builder; -use rand::rngs::StdRng; use rand::SeedableRng; -use sql_over_http::{uuid_to_header_value, NEON_REQUEST_ID}; +use rand::rngs::StdRng; +use sql_over_http::{NEON_REQUEST_ID, uuid_to_header_value}; use tokio::io::{AsyncRead, AsyncWrite}; use tokio::net::{TcpListener, TcpStream}; use tokio::time::timeout; use tokio_rustls::TlsAcceptor; use tokio_util::sync::CancellationToken; use tokio_util::task::TaskTracker; -use tracing::{info, warn, Instrument}; +use tracing::{Instrument, info, warn}; use crate::cancellation::CancellationHandler; use crate::config::{ProxyConfig, ProxyProtocolV2}; use crate::context::RequestContext; use crate::ext::TaskExt; use crate::metrics::Metrics; -use crate::protocol2::{read_proxy_protocol, ChainRW, ConnectHeader, ConnectionInfo}; +use crate::protocol2::{ChainRW, ConnectHeader, ConnectionInfo, read_proxy_protocol}; use crate::proxy::run_until_cancelled; use crate::rate_limiter::EndpointRateLimiter; use crate::serverless::backend::PoolingBackend; diff --git a/proxy/src/serverless/sql_over_http.rs b/proxy/src/serverless/sql_over_http.rs index 7c21d90ed8..8babfb5cd2 100644 --- a/proxy/src/serverless/sql_over_http.rs +++ b/proxy/src/serverless/sql_over_http.rs @@ -2,23 +2,23 @@ use std::pin::pin; use std::sync::Arc; use bytes::Bytes; -use futures::future::{select, try_join, Either}; +use futures::future::{Either, select, try_join}; use futures::{StreamExt, TryFutureExt}; -use http::header::AUTHORIZATION; use http::Method; +use http::header::AUTHORIZATION; use http_body_util::combinators::BoxBody; use http_body_util::{BodyExt, Full}; use http_utils::error::ApiError; use hyper::body::Incoming; use hyper::http::{HeaderName, HeaderValue}; -use hyper::{header, HeaderMap, Request, Response, StatusCode}; +use hyper::{HeaderMap, Request, Response, StatusCode, header}; use indexmap::IndexMap; use postgres_client::error::{DbError, ErrorPosition, SqlState}; use postgres_client::{GenericClient, IsolationLevel, NoTls, ReadyForQueryStatus, Transaction}; use pq_proto::StartupMessageParamsBuilder; use serde::Serialize; -use serde_json::value::RawValue; use serde_json::Value; +use serde_json::value::RawValue; use tokio::time::{self, Instant}; use tokio_util::sync::CancellationToken; use tracing::{debug, error, info}; @@ -31,15 +31,15 @@ use super::conn_pool::{AuthData, ConnInfoWithAuth}; use super::conn_pool_lib::{self, ConnInfo}; use super::error::HttpCodeError; use super::http_util::json_response; -use super::json::{json_to_pg_text, pg_text_row_to_json, JsonConversionError}; +use super::json::{JsonConversionError, json_to_pg_text, pg_text_row_to_json}; use crate::auth::backend::{ComputeCredentialKeys, ComputeUserInfo}; -use crate::auth::{endpoint_sni, ComputeUserInfoParseError}; +use crate::auth::{ComputeUserInfoParseError, endpoint_sni}; use crate::config::{AuthenticationConfig, HttpConfig, ProxyConfig, TlsConfig}; use crate::context::RequestContext; use crate::error::{ErrorKind, ReportableError, UserFacingError}; -use crate::http::{read_body_with_limit, ReadBodyError}; +use crate::http::{ReadBodyError, read_body_with_limit}; use crate::metrics::{HttpDirection, Metrics}; -use crate::proxy::{run_until_cancelled, NeonOptions}; +use crate::proxy::{NeonOptions, run_until_cancelled}; use crate::serverless::backend::HttpConnError; use crate::types::{DbName, RoleName}; use crate::usage_metrics::{MetricCounter, MetricCounterRecorder, TrafficDirection}; @@ -1021,7 +1021,7 @@ async fn query_to_json( data: QueryData, current_size: &mut usize, parsed_headers: HttpHeaders, -) -> Result<(ReadyForQueryStatus, impl Serialize), SqlOverHttpError> { +) -> Result<(ReadyForQueryStatus, impl Serialize + use), SqlOverHttpError> { let query_start = Instant::now(); let query_params = data.params; diff --git a/proxy/src/serverless/websocket.rs b/proxy/src/serverless/websocket.rs index 585a7d63b2..c4baeeb5cc 100644 --- a/proxy/src/serverless/websocket.rs +++ b/proxy/src/serverless/websocket.rs @@ -1,6 +1,6 @@ use std::pin::Pin; use std::sync::Arc; -use std::task::{ready, Context, Poll}; +use std::task::{Context, Poll, ready}; use anyhow::Context as _; use bytes::{Buf, BufMut, Bytes, BytesMut}; @@ -15,9 +15,9 @@ use tracing::warn; use crate::cancellation::CancellationHandler; use crate::config::ProxyConfig; use crate::context::RequestContext; -use crate::error::{io_error, ReportableError}; +use crate::error::{ReportableError, io_error}; use crate::metrics::Metrics; -use crate::proxy::{handle_client, ClientMode, ErrorSource}; +use crate::proxy::{ClientMode, ErrorSource, handle_client}; use crate::rate_limiter::EndpointRateLimiter; pin_project! { @@ -184,11 +184,11 @@ mod tests { use framed_websockets::WebSocketServer; use futures::{SinkExt, StreamExt}; - use tokio::io::{duplex, AsyncReadExt, AsyncWriteExt}; + use tokio::io::{AsyncReadExt, AsyncWriteExt, duplex}; use tokio::task::JoinSet; - use tokio_tungstenite::tungstenite::protocol::Role; - use tokio_tungstenite::tungstenite::Message; use tokio_tungstenite::WebSocketStream; + use tokio_tungstenite::tungstenite::Message; + use tokio_tungstenite::tungstenite::protocol::Role; use super::WebSocketRw; diff --git a/proxy/src/signals.rs b/proxy/src/signals.rs index 0b675683c0..32b2344a1c 100644 --- a/proxy/src/signals.rs +++ b/proxy/src/signals.rs @@ -12,7 +12,7 @@ pub async fn handle( where F: FnMut(), { - use tokio::signal::unix::{signal, SignalKind}; + use tokio::signal::unix::{SignalKind, signal}; let mut hangup = signal(SignalKind::hangup())?; let mut interrupt = signal(SignalKind::interrupt())?; diff --git a/proxy/src/tls/postgres_rustls.rs b/proxy/src/tls/postgres_rustls.rs index 0ad279b635..f09e916a1d 100644 --- a/proxy/src/tls/postgres_rustls.rs +++ b/proxy/src/tls/postgres_rustls.rs @@ -2,8 +2,8 @@ use std::convert::TryFrom; use std::sync::Arc; use postgres_client::tls::MakeTlsConnect; -use rustls::pki_types::ServerName; use rustls::ClientConfig; +use rustls::pki_types::ServerName; use tokio::io::{AsyncRead, AsyncWrite}; mod private { @@ -15,8 +15,8 @@ mod private { use postgres_client::tls::{ChannelBinding, TlsConnect}; use rustls::pki_types::ServerName; use tokio::io::{AsyncRead, AsyncWrite, ReadBuf}; - use tokio_rustls::client::TlsStream; use tokio_rustls::TlsConnector; + use tokio_rustls::client::TlsStream; use crate::tls::TlsServerEndPoint; diff --git a/proxy/src/tls/server_config.rs b/proxy/src/tls/server_config.rs index 2cc1657eea..903c0b712b 100644 --- a/proxy/src/tls/server_config.rs +++ b/proxy/src/tls/server_config.rs @@ -1,12 +1,12 @@ use std::collections::{HashMap, HashSet}; use std::sync::Arc; -use anyhow::{bail, Context}; +use anyhow::{Context, bail}; use itertools::Itertools; use rustls::crypto::ring::{self, sign}; use rustls::pki_types::{CertificateDer, PrivateKeyDer}; -use super::{TlsServerEndPoint, PG_ALPN_PROTOCOL}; +use super::{PG_ALPN_PROTOCOL, TlsServerEndPoint}; pub struct TlsConfig { pub config: Arc, diff --git a/proxy/src/usage_metrics.rs b/proxy/src/usage_metrics.rs index 6a23f0e129..004d268fa1 100644 --- a/proxy/src/usage_metrics.rs +++ b/proxy/src/usage_metrics.rs @@ -2,17 +2,17 @@ //! and push them to a HTTP endpoint. use std::borrow::Cow; use std::convert::Infallible; -use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering}; use std::sync::Arc; +use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering}; use std::time::Duration; -use anyhow::{bail, Context}; +use anyhow::{Context, bail}; use async_compression::tokio::write::GzipEncoder; use bytes::Bytes; use chrono::{DateTime, Datelike, Timelike, Utc}; -use clashmap::mapref::entry::Entry; use clashmap::ClashMap; -use consumption_metrics::{idempotency_key, Event, EventChunk, EventType, CHUNK_SIZE}; +use clashmap::mapref::entry::Entry; +use consumption_metrics::{CHUNK_SIZE, Event, EventChunk, EventType, idempotency_key}; use once_cell::sync::Lazy; use remote_storage::{GenericRemoteStorage, RemotePath, TimeoutOrCancel}; use serde::{Deserialize, Serialize}; @@ -62,11 +62,7 @@ mod none_as_empty_string { d: D, ) -> Result, D::Error> { let s = SmolStr::deserialize(d)?; - if s.is_empty() { - Ok(None) - } else { - Ok(Some(s)) - } + if s.is_empty() { Ok(None) } else { Ok(Some(s)) } } }