diff --git a/.github/actions/allure-report-generate/action.yml b/.github/actions/allure-report-generate/action.yml index d07e3e32e8..b85ca7874d 100644 --- a/.github/actions/allure-report-generate/action.yml +++ b/.github/actions/allure-report-generate/action.yml @@ -38,9 +38,11 @@ runs: # - name: Set variables shell: bash -euxo pipefail {0} + env: + PR_NUMBER: ${{ github.event.pull_request.number }} + BUCKET: neon-github-public-dev run: | - PR_NUMBER=$(jq --raw-output .pull_request.number "$GITHUB_EVENT_PATH" || true) - if [ "${PR_NUMBER}" != "null" ]; then + if [ -n "${PR_NUMBER}" ]; then BRANCH_OR_PR=pr-${PR_NUMBER} elif [ "${GITHUB_REF_NAME}" = "main" ] || [ "${GITHUB_REF_NAME}" = "release" ] || \ [ "${GITHUB_REF_NAME}" = "release-proxy" ] || [ "${GITHUB_REF_NAME}" = "release-compute" ]; then @@ -59,8 +61,6 @@ runs: echo "LOCK_FILE=${LOCK_FILE}" >> $GITHUB_ENV echo "WORKDIR=${WORKDIR}" >> $GITHUB_ENV echo "BUCKET=${BUCKET}" >> $GITHUB_ENV - env: - BUCKET: neon-github-public-dev # TODO: We can replace with a special docker image with Java and Allure pre-installed - uses: actions/setup-java@v4 @@ -80,8 +80,8 @@ runs: rm -f ${ALLURE_ZIP} fi env: - ALLURE_VERSION: 2.27.0 - ALLURE_ZIP_SHA256: b071858fb2fa542c65d8f152c5c40d26267b2dfb74df1f1608a589ecca38e777 + ALLURE_VERSION: 2.32.2 + ALLURE_ZIP_SHA256: 3f28885e2118f6317c92f667eaddcc6491400af1fb9773c1f3797a5fa5174953 - uses: aws-actions/configure-aws-credentials@v4 if: ${{ !cancelled() }} diff --git a/.github/actions/allure-report-store/action.yml b/.github/actions/allure-report-store/action.yml index 8548a886cf..687bfd49af 100644 --- a/.github/actions/allure-report-store/action.yml +++ b/.github/actions/allure-report-store/action.yml @@ -18,9 +18,11 @@ runs: steps: - name: Set variables shell: bash -euxo pipefail {0} + env: + PR_NUMBER: ${{ github.event.pull_request.number }} + REPORT_DIR: ${{ inputs.report-dir }} run: | - PR_NUMBER=$(jq --raw-output .pull_request.number "$GITHUB_EVENT_PATH" || true) - if [ "${PR_NUMBER}" != "null" ]; then + if [ -n "${PR_NUMBER}" ]; then BRANCH_OR_PR=pr-${PR_NUMBER} elif [ "${GITHUB_REF_NAME}" = "main" ] || [ "${GITHUB_REF_NAME}" = "release" ] || \ [ "${GITHUB_REF_NAME}" = "release-proxy" ] || [ "${GITHUB_REF_NAME}" = "release-compute" ]; then @@ -32,8 +34,6 @@ runs: echo "BRANCH_OR_PR=${BRANCH_OR_PR}" >> $GITHUB_ENV echo "REPORT_DIR=${REPORT_DIR}" >> $GITHUB_ENV - env: - REPORT_DIR: ${{ inputs.report-dir }} - uses: aws-actions/configure-aws-credentials@v4 if: ${{ !cancelled() }} diff --git a/.github/actions/run-python-test-set/action.yml b/.github/actions/run-python-test-set/action.yml index 0eddfe5da6..122fe48b68 100644 --- a/.github/actions/run-python-test-set/action.yml +++ b/.github/actions/run-python-test-set/action.yml @@ -236,5 +236,5 @@ runs: uses: ./.github/actions/allure-report-store with: report-dir: /tmp/test_output/allure/results - unique-key: ${{ inputs.build_type }}-${{ inputs.pg_version }} + unique-key: ${{ inputs.build_type }}-${{ inputs.pg_version }}-${{ runner.arch }} aws-oicd-role-arn: ${{ inputs.aws-oicd-role-arn }} diff --git a/Cargo.lock b/Cargo.lock index 038727f1a8..47552174d2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1546,6 +1546,17 @@ dependencies = [ "itertools 0.10.5", ] +[[package]] +name = "cron" +version = "0.15.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5877d3fbf742507b66bc2a1945106bd30dd8504019d596901ddd012a4dd01740" +dependencies = [ + "chrono", + "once_cell", + "winnow", +] + [[package]] name = "crossbeam-channel" version = "0.5.8" @@ -6446,6 +6457,7 @@ dependencies = [ "chrono", "clap", "control_plane", + "cron", "diesel", "diesel-async", "diesel_migrations", @@ -8138,9 +8150,9 @@ checksum = "589f6da84c646204747d1270a2a5661ea66ed1cced2631d546fdfb155959f9ec" [[package]] name = "winnow" -version = "0.6.13" +version = "0.6.26" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "59b5e5f6c299a3c7890b876a2a587f3115162487e704907d9b6cd29473052ba1" +checksum = "1e90edd2ac1aa278a5c4599b1d89cf03074b610800f866d4026dc199d7929a28" dependencies = [ "memchr", ] diff --git a/Cargo.toml b/Cargo.toml index 21310ce6ec..e6ca3c982c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -77,6 +77,7 @@ byteorder = "1.4" bytes = "1.9" camino = "1.1.6" cfg-if = "1.0.0" +cron = "0.15" chrono = { version = "0.4", default-features = false, features = ["clock"] } clap = { version = "4.0", features = ["derive", "env"] } clashmap = { version = "1.0", features = ["raw-api"] } diff --git a/libs/proxy/postgres-protocol2/Cargo.toml b/libs/proxy/postgres-protocol2/Cargo.toml index f66a292d5e..7ebb05eec1 100644 --- a/libs/proxy/postgres-protocol2/Cargo.toml +++ b/libs/proxy/postgres-protocol2/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "postgres-protocol2" version = "0.1.0" -edition = "2021" +edition = "2024" license = "MIT/Apache-2.0" [dependencies] diff --git a/libs/proxy/postgres-protocol2/src/authentication/sasl.rs b/libs/proxy/postgres-protocol2/src/authentication/sasl.rs index f2200a40ce..27e05e24ec 100644 --- a/libs/proxy/postgres-protocol2/src/authentication/sasl.rs +++ b/libs/proxy/postgres-protocol2/src/authentication/sasl.rs @@ -1,14 +1,12 @@ //! SASL-based authentication support. +use std::fmt::Write; +use std::{io, iter, mem, str}; + use hmac::{Hmac, Mac}; use rand::{self, Rng}; use sha2::digest::FixedOutput; use sha2::{Digest, Sha256}; -use std::fmt::Write; -use std::io; -use std::iter; -use std::mem; -use std::str; use tokio::task::yield_now; const NONCE_LENGTH: usize = 24; @@ -493,11 +491,9 @@ mod test { let nonce = "9IZ2O01zb9IgiIZ1WJ/zgpJB"; let client_first = "n,,n=,r=9IZ2O01zb9IgiIZ1WJ/zgpJB"; - let server_first = - "r=9IZ2O01zb9IgiIZ1WJ/zgpJBjx/oIRLs02gGSHcw1KEty3eY,s=fs3IXBy7U7+IvVjZ,i\ + let server_first = "r=9IZ2O01zb9IgiIZ1WJ/zgpJBjx/oIRLs02gGSHcw1KEty3eY,s=fs3IXBy7U7+IvVjZ,i\ =4096"; - let client_final = - "c=biws,r=9IZ2O01zb9IgiIZ1WJ/zgpJBjx/oIRLs02gGSHcw1KEty3eY,p=AmNKosjJzS3\ + let client_final = "c=biws,r=9IZ2O01zb9IgiIZ1WJ/zgpJBjx/oIRLs02gGSHcw1KEty3eY,p=AmNKosjJzS3\ 1NTlQYNs5BTeQjdHdk7lOflDo5re2an8="; let server_final = "v=U+ppxD5XUKtradnv8e2MkeupiA8FU87Sg8CXzXHDAzw="; diff --git a/libs/proxy/postgres-protocol2/src/lib.rs b/libs/proxy/postgres-protocol2/src/lib.rs index 6032440f9a..afbd1e92bd 100644 --- a/libs/proxy/postgres-protocol2/src/lib.rs +++ b/libs/proxy/postgres-protocol2/src/lib.rs @@ -11,9 +11,10 @@ //! set to `UTF8`. It will most likely not behave properly if that is not the case. #![warn(missing_docs, clippy::all)] +use std::io; + use byteorder::{BigEndian, ByteOrder}; use bytes::{BufMut, BytesMut}; -use std::io; pub mod authentication; pub mod escape; diff --git a/libs/proxy/postgres-protocol2/src/message/backend.rs b/libs/proxy/postgres-protocol2/src/message/backend.rs index 097964f9c1..d7eaef9509 100644 --- a/libs/proxy/postgres-protocol2/src/message/backend.rs +++ b/libs/proxy/postgres-protocol2/src/message/backend.rs @@ -1,13 +1,13 @@ #![allow(missing_docs)] +use std::io::{self, Read}; +use std::ops::Range; +use std::{cmp, str}; + use byteorder::{BigEndian, ByteOrder, ReadBytesExt}; use bytes::{Bytes, BytesMut}; use fallible_iterator::FallibleIterator; use memchr::memchr; -use std::cmp; -use std::io::{self, Read}; -use std::ops::Range; -use std::str; use crate::Oid; diff --git a/libs/proxy/postgres-protocol2/src/message/frontend.rs b/libs/proxy/postgres-protocol2/src/message/frontend.rs index 640f35ada3..b447290ea8 100644 --- a/libs/proxy/postgres-protocol2/src/message/frontend.rs +++ b/libs/proxy/postgres-protocol2/src/message/frontend.rs @@ -1,13 +1,13 @@ //! Frontend message serialization. #![allow(missing_docs)] +use std::error::Error; +use std::{io, marker}; + use byteorder::{BigEndian, ByteOrder}; use bytes::{Buf, BufMut, BytesMut}; -use std::error::Error; -use std::io; -use std::marker; -use crate::{write_nullable, FromUsize, IsNull, Oid}; +use crate::{FromUsize, IsNull, Oid, write_nullable}; #[inline] fn write_body(buf: &mut BytesMut, f: F) -> Result<(), E> diff --git a/libs/proxy/postgres-protocol2/src/password/mod.rs b/libs/proxy/postgres-protocol2/src/password/mod.rs index 38eb31dfcf..4cd9bfb060 100644 --- a/libs/proxy/postgres-protocol2/src/password/mod.rs +++ b/libs/proxy/postgres-protocol2/src/password/mod.rs @@ -6,12 +6,13 @@ //! side. This is good because it ensures the cleartext password won't //! end up in logs pg_stat displays, etc. -use crate::authentication::sasl; use hmac::{Hmac, Mac}; use rand::RngCore; use sha2::digest::FixedOutput; use sha2::{Digest, Sha256}; +use crate::authentication::sasl; + #[cfg(test)] mod test; diff --git a/libs/proxy/postgres-protocol2/src/types/mod.rs b/libs/proxy/postgres-protocol2/src/types/mod.rs index 78131c05bf..6a9b334bcb 100644 --- a/libs/proxy/postgres-protocol2/src/types/mod.rs +++ b/libs/proxy/postgres-protocol2/src/types/mod.rs @@ -1,11 +1,12 @@ //! Conversions to and from Postgres's binary format for various types. -use byteorder::{BigEndian, ReadBytesExt}; -use bytes::{BufMut, BytesMut}; -use fallible_iterator::FallibleIterator; use std::boxed::Box as StdBox; use std::error::Error; use std::str; +use byteorder::{BigEndian, ReadBytesExt}; +use bytes::{BufMut, BytesMut}; +use fallible_iterator::FallibleIterator; + use crate::Oid; #[cfg(test)] diff --git a/libs/proxy/postgres-types2/Cargo.toml b/libs/proxy/postgres-types2/Cargo.toml index 57efd94cd3..25ad23ba35 100644 --- a/libs/proxy/postgres-types2/Cargo.toml +++ b/libs/proxy/postgres-types2/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "postgres-types2" version = "0.1.0" -edition = "2021" +edition = "2024" license = "MIT/Apache-2.0" [dependencies] diff --git a/libs/proxy/postgres-types2/src/lib.rs b/libs/proxy/postgres-types2/src/lib.rs index d4f3afdfd4..0ccd8c295f 100644 --- a/libs/proxy/postgres-types2/src/lib.rs +++ b/libs/proxy/postgres-types2/src/lib.rs @@ -4,19 +4,18 @@ //! unless you want to define your own `ToSql` or `FromSql` definitions. #![warn(clippy::all, missing_docs)] -use fallible_iterator::FallibleIterator; -use postgres_protocol2::types; use std::any::type_name; use std::error::Error; use std::fmt; use std::sync::Arc; -use crate::type_gen::{Inner, Other}; - +use bytes::BytesMut; +use fallible_iterator::FallibleIterator; #[doc(inline)] pub use postgres_protocol2::Oid; +use postgres_protocol2::types; -use bytes::BytesMut; +use crate::type_gen::{Inner, Other}; /// Generates a simple implementation of `ToSql::accepts` which accepts the /// types passed to it. diff --git a/libs/proxy/postgres-types2/src/private.rs b/libs/proxy/postgres-types2/src/private.rs index 774f9a301c..188b982812 100644 --- a/libs/proxy/postgres-types2/src/private.rs +++ b/libs/proxy/postgres-types2/src/private.rs @@ -1,7 +1,9 @@ -use crate::{FromSql, Type}; -pub use bytes::BytesMut; use std::error::Error; +pub use bytes::BytesMut; + +use crate::{FromSql, Type}; + pub fn read_be_i32(buf: &mut &[u8]) -> Result> { if buf.len() < 4 { return Err("invalid buffer size".into()); diff --git a/libs/proxy/tokio-postgres2/Cargo.toml b/libs/proxy/tokio-postgres2/Cargo.toml index 161c6b8309..540876742f 100644 --- a/libs/proxy/tokio-postgres2/Cargo.toml +++ b/libs/proxy/tokio-postgres2/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "tokio-postgres2" version = "0.1.0" -edition = "2021" +edition = "2024" license = "MIT/Apache-2.0" [dependencies] diff --git a/libs/proxy/tokio-postgres2/src/cancel_query.rs b/libs/proxy/tokio-postgres2/src/cancel_query.rs index cddbf16336..b65fb571e6 100644 --- a/libs/proxy/tokio-postgres2/src/cancel_query.rs +++ b/libs/proxy/tokio-postgres2/src/cancel_query.rs @@ -1,10 +1,11 @@ +use std::io; + use tokio::net::TcpStream; use crate::client::SocketConfig; use crate::config::{Host, SslMode}; use crate::tls::MakeTlsConnect; -use crate::{cancel_query_raw, connect_socket, Error}; -use std::io; +use crate::{Error, cancel_query_raw, connect_socket}; pub(crate) async fn cancel_query( config: Option, @@ -22,7 +23,7 @@ where return Err(Error::connect(io::Error::new( io::ErrorKind::InvalidInput, "unknown host", - ))) + ))); } }; diff --git a/libs/proxy/tokio-postgres2/src/cancel_query_raw.rs b/libs/proxy/tokio-postgres2/src/cancel_query_raw.rs index 8c08296435..c720214e9b 100644 --- a/libs/proxy/tokio-postgres2/src/cancel_query_raw.rs +++ b/libs/proxy/tokio-postgres2/src/cancel_query_raw.rs @@ -1,10 +1,11 @@ -use crate::config::SslMode; -use crate::tls::TlsConnect; -use crate::{connect_tls, Error}; use bytes::BytesMut; use postgres_protocol2::message::frontend; use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt}; +use crate::config::SslMode; +use crate::tls::TlsConnect; +use crate::{Error, connect_tls}; + pub async fn cancel_query_raw( stream: S, mode: SslMode, diff --git a/libs/proxy/tokio-postgres2/src/cancel_token.rs b/libs/proxy/tokio-postgres2/src/cancel_token.rs index 718f903a92..f6526395ee 100644 --- a/libs/proxy/tokio-postgres2/src/cancel_token.rs +++ b/libs/proxy/tokio-postgres2/src/cancel_token.rs @@ -1,12 +1,12 @@ -use crate::config::SslMode; -use crate::tls::TlsConnect; - -use crate::{cancel_query, client::SocketConfig, tls::MakeTlsConnect}; -use crate::{cancel_query_raw, Error}; use serde::{Deserialize, Serialize}; use tokio::io::{AsyncRead, AsyncWrite}; use tokio::net::TcpStream; +use crate::client::SocketConfig; +use crate::config::SslMode; +use crate::tls::{MakeTlsConnect, TlsConnect}; +use crate::{Error, cancel_query, cancel_query_raw}; + /// The capability to request cancellation of in-progress queries on a /// connection. #[derive(Clone, Serialize, Deserialize)] diff --git a/libs/proxy/tokio-postgres2/src/client.rs b/libs/proxy/tokio-postgres2/src/client.rs index 46151ab924..39b1db75da 100644 --- a/libs/proxy/tokio-postgres2/src/client.rs +++ b/libs/proxy/tokio-postgres2/src/client.rs @@ -1,31 +1,28 @@ -use crate::codec::{BackendMessages, FrontendMessage}; - -use crate::config::Host; -use crate::config::SslMode; -use crate::connection::{Request, RequestMessages}; - -use crate::query::RowStream; -use crate::simple_query::SimpleQueryStream; - -use crate::types::{Oid, ToSql, Type}; - -use crate::{ - query, simple_query, slice_iter, CancelToken, Error, ReadyForQueryStatus, Row, - SimpleQueryMessage, Statement, Transaction, TransactionBuilder, -}; -use bytes::BytesMut; -use fallible_iterator::FallibleIterator; -use futures_util::{future, ready, TryStreamExt}; -use parking_lot::Mutex; -use postgres_protocol2::message::{backend::Message, frontend}; -use serde::{Deserialize, Serialize}; use std::collections::HashMap; use std::fmt; use std::sync::Arc; use std::task::{Context, Poll}; +use std::time::Duration; + +use bytes::BytesMut; +use fallible_iterator::FallibleIterator; +use futures_util::{TryStreamExt, future, ready}; +use parking_lot::Mutex; +use postgres_protocol2::message::backend::Message; +use postgres_protocol2::message::frontend; +use serde::{Deserialize, Serialize}; use tokio::sync::mpsc; -use std::time::Duration; +use crate::codec::{BackendMessages, FrontendMessage}; +use crate::config::{Host, SslMode}; +use crate::connection::{Request, RequestMessages}; +use crate::query::RowStream; +use crate::simple_query::SimpleQueryStream; +use crate::types::{Oid, ToSql, Type}; +use crate::{ + CancelToken, Error, ReadyForQueryStatus, Row, SimpleQueryMessage, Statement, Transaction, + TransactionBuilder, query, simple_query, slice_iter, +}; pub struct Responses { receiver: mpsc::Receiver, diff --git a/libs/proxy/tokio-postgres2/src/codec.rs b/libs/proxy/tokio-postgres2/src/codec.rs index 0ec46198ce..f1fd9b47b3 100644 --- a/libs/proxy/tokio-postgres2/src/codec.rs +++ b/libs/proxy/tokio-postgres2/src/codec.rs @@ -1,8 +1,9 @@ +use std::io; + use bytes::{Buf, Bytes, BytesMut}; use fallible_iterator::FallibleIterator; use postgres_protocol2::message::backend; use postgres_protocol2::message::frontend::CopyData; -use std::io; use tokio_util::codec::{Decoder, Encoder}; pub enum FrontendMessage { diff --git a/libs/proxy/tokio-postgres2/src/config.rs b/libs/proxy/tokio-postgres2/src/config.rs index 47cc45ac80..4c25491b67 100644 --- a/libs/proxy/tokio-postgres2/src/config.rs +++ b/libs/proxy/tokio-postgres2/src/config.rs @@ -1,21 +1,19 @@ //! Connection configuration. -use crate::connect::connect; -use crate::connect_raw::connect_raw; -use crate::connect_raw::RawConnection; -use crate::tls::MakeTlsConnect; -use crate::tls::TlsConnect; -use crate::{Client, Connection, Error}; -use postgres_protocol2::message::frontend::StartupMessageParams; -use serde::{Deserialize, Serialize}; -use std::fmt; -use std::str; use std::time::Duration; -use tokio::io::{AsyncRead, AsyncWrite}; +use std::{fmt, str}; pub use postgres_protocol2::authentication::sasl::ScramKeys; +use postgres_protocol2::message::frontend::StartupMessageParams; +use serde::{Deserialize, Serialize}; +use tokio::io::{AsyncRead, AsyncWrite}; use tokio::net::TcpStream; +use crate::connect::connect; +use crate::connect_raw::{RawConnection, connect_raw}; +use crate::tls::{MakeTlsConnect, TlsConnect}; +use crate::{Client, Connection, Error}; + /// TLS configuration. #[derive(Debug, Copy, Clone, PartialEq, Eq, Serialize, Deserialize)] #[non_exhaustive] diff --git a/libs/proxy/tokio-postgres2/src/connect.rs b/libs/proxy/tokio-postgres2/src/connect.rs index e0cb69748d..d2bd0dfbcd 100644 --- a/libs/proxy/tokio-postgres2/src/connect.rs +++ b/libs/proxy/tokio-postgres2/src/connect.rs @@ -1,3 +1,7 @@ +use postgres_protocol2::message::backend::Message; +use tokio::net::TcpStream; +use tokio::sync::mpsc; + use crate::client::SocketConfig; use crate::codec::BackendMessage; use crate::config::Host; @@ -5,9 +9,6 @@ use crate::connect_raw::connect_raw; use crate::connect_socket::connect_socket; use crate::tls::{MakeTlsConnect, TlsConnect}; use crate::{Client, Config, Connection, Error, RawConnection}; -use postgres_protocol2::message::backend::Message; -use tokio::net::TcpStream; -use tokio::sync::mpsc; pub async fn connect( mut tls: T, diff --git a/libs/proxy/tokio-postgres2/src/connect_raw.rs b/libs/proxy/tokio-postgres2/src/connect_raw.rs index 66db85e07d..20dc538cf2 100644 --- a/libs/proxy/tokio-postgres2/src/connect_raw.rs +++ b/libs/proxy/tokio-postgres2/src/connect_raw.rs @@ -1,22 +1,24 @@ +use std::collections::HashMap; +use std::io; +use std::pin::Pin; +use std::task::{Context, Poll}; + +use bytes::BytesMut; +use fallible_iterator::FallibleIterator; +use futures_util::{Sink, SinkExt, Stream, TryStreamExt, ready}; +use postgres_protocol2::authentication::sasl; +use postgres_protocol2::authentication::sasl::ScramSha256; +use postgres_protocol2::message::backend::{AuthenticationSaslBody, Message, NoticeResponseBody}; +use postgres_protocol2::message::frontend; +use tokio::io::{AsyncRead, AsyncWrite}; +use tokio_util::codec::Framed; + +use crate::Error; use crate::codec::{BackendMessage, BackendMessages, FrontendMessage, PostgresCodec}; use crate::config::{self, AuthKeys, Config}; use crate::connect_tls::connect_tls; use crate::maybe_tls_stream::MaybeTlsStream; use crate::tls::{TlsConnect, TlsStream}; -use crate::Error; -use bytes::BytesMut; -use fallible_iterator::FallibleIterator; -use futures_util::{ready, Sink, SinkExt, Stream, TryStreamExt}; -use postgres_protocol2::authentication::sasl; -use postgres_protocol2::authentication::sasl::ScramSha256; -use postgres_protocol2::message::backend::{AuthenticationSaslBody, Message, NoticeResponseBody}; -use postgres_protocol2::message::frontend; -use std::collections::HashMap; -use std::io; -use std::pin::Pin; -use std::task::{Context, Poll}; -use tokio::io::{AsyncRead, AsyncWrite}; -use tokio_util::codec::Framed; pub struct StartupStream { inner: Framed, PostgresCodec>, @@ -158,7 +160,7 @@ where | Some(Message::AuthenticationSspi) => { return Err(Error::authentication( "unsupported authentication method".into(), - )) + )); } Some(Message::ErrorResponse(body)) => return Err(Error::db(body)), Some(_) => return Err(Error::unexpected_message()), diff --git a/libs/proxy/tokio-postgres2/src/connect_socket.rs b/libs/proxy/tokio-postgres2/src/connect_socket.rs index 336a13317f..15411f7ef3 100644 --- a/libs/proxy/tokio-postgres2/src/connect_socket.rs +++ b/libs/proxy/tokio-postgres2/src/connect_socket.rs @@ -1,11 +1,13 @@ -use crate::config::Host; -use crate::Error; use std::future::Future; use std::io; use std::time::Duration; + use tokio::net::{self, TcpStream}; use tokio::time; +use crate::Error; +use crate::config::Host; + pub(crate) async fn connect_socket( host: &Host, port: u16, diff --git a/libs/proxy/tokio-postgres2/src/connect_tls.rs b/libs/proxy/tokio-postgres2/src/connect_tls.rs index 64b0b68abc..4dc929a9e2 100644 --- a/libs/proxy/tokio-postgres2/src/connect_tls.rs +++ b/libs/proxy/tokio-postgres2/src/connect_tls.rs @@ -1,12 +1,13 @@ -use crate::config::SslMode; -use crate::maybe_tls_stream::MaybeTlsStream; -use crate::tls::private::ForcePrivateApi; -use crate::tls::TlsConnect; -use crate::Error; use bytes::BytesMut; use postgres_protocol2::message::frontend; use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt}; +use crate::Error; +use crate::config::SslMode; +use crate::maybe_tls_stream::MaybeTlsStream; +use crate::tls::TlsConnect; +use crate::tls::private::ForcePrivateApi; + pub async fn connect_tls( mut stream: S, mode: SslMode, @@ -19,7 +20,7 @@ where match mode { SslMode::Disable => return Ok(MaybeTlsStream::Raw(stream)), SslMode::Prefer if !tls.can_connect(ForcePrivateApi) => { - return Ok(MaybeTlsStream::Raw(stream)) + return Ok(MaybeTlsStream::Raw(stream)); } SslMode::Prefer | SslMode::Require => {} } diff --git a/libs/proxy/tokio-postgres2/src/connection.rs b/libs/proxy/tokio-postgres2/src/connection.rs index f478717e0d..60e39b3b44 100644 --- a/libs/proxy/tokio-postgres2/src/connection.rs +++ b/libs/proxy/tokio-postgres2/src/connection.rs @@ -1,22 +1,24 @@ -use crate::codec::{BackendMessage, BackendMessages, FrontendMessage, PostgresCodec}; -use crate::error::DbError; -use crate::maybe_tls_stream::MaybeTlsStream; -use crate::{AsyncMessage, Error, Notification}; -use bytes::BytesMut; -use fallible_iterator::FallibleIterator; -use futures_util::{ready, Sink, Stream}; -use log::{info, trace}; -use postgres_protocol2::message::backend::Message; -use postgres_protocol2::message::frontend; use std::collections::{HashMap, VecDeque}; use std::future::Future; use std::pin::Pin; use std::task::{Context, Poll}; + +use bytes::BytesMut; +use fallible_iterator::FallibleIterator; +use futures_util::{Sink, Stream, ready}; +use log::{info, trace}; +use postgres_protocol2::message::backend::Message; +use postgres_protocol2::message::frontend; use tokio::io::{AsyncRead, AsyncWrite}; use tokio::sync::mpsc; use tokio_util::codec::Framed; use tokio_util::sync::PollSender; +use crate::codec::{BackendMessage, BackendMessages, FrontendMessage, PostgresCodec}; +use crate::error::DbError; +use crate::maybe_tls_stream::MaybeTlsStream; +use crate::{AsyncMessage, Error, Notification}; + pub enum RequestMessages { Single(FrontendMessage), } @@ -139,7 +141,7 @@ where Some(response) => response, None => match messages.next().map_err(Error::parse)? { Some(Message::ErrorResponse(error)) => { - return Poll::Ready(Err(Error::db(error))) + return Poll::Ready(Err(Error::db(error))); } _ => return Poll::Ready(Err(Error::unexpected_message())), }, diff --git a/libs/proxy/tokio-postgres2/src/error/mod.rs b/libs/proxy/tokio-postgres2/src/error/mod.rs index 922c348525..b12e76e5bf 100644 --- a/libs/proxy/tokio-postgres2/src/error/mod.rs +++ b/libs/proxy/tokio-postgres2/src/error/mod.rs @@ -1,10 +1,10 @@ //! Errors. +use std::error::{self, Error as _Error}; +use std::{fmt, io}; + use fallible_iterator::FallibleIterator; use postgres_protocol2::message::backend::{ErrorFields, ErrorResponseBody}; -use std::error::{self, Error as _Error}; -use std::fmt; -use std::io; pub use self::sqlstate::*; diff --git a/libs/proxy/tokio-postgres2/src/generic_client.rs b/libs/proxy/tokio-postgres2/src/generic_client.rs index 042b5a675e..31c3d8fa3e 100644 --- a/libs/proxy/tokio-postgres2/src/generic_client.rs +++ b/libs/proxy/tokio-postgres2/src/generic_client.rs @@ -1,9 +1,10 @@ #![allow(async_fn_in_trait)] +use postgres_protocol2::Oid; + use crate::query::RowStream; use crate::types::Type; use crate::{Client, Error, Transaction}; -use postgres_protocol2::Oid; mod private { pub trait Sealed {} diff --git a/libs/proxy/tokio-postgres2/src/lib.rs b/libs/proxy/tokio-postgres2/src/lib.rs index 7426279167..c8ebba5487 100644 --- a/libs/proxy/tokio-postgres2/src/lib.rs +++ b/libs/proxy/tokio-postgres2/src/lib.rs @@ -1,6 +1,8 @@ //! An asynchronous, pipelined, PostgreSQL client. #![warn(clippy::all)] +use postgres_protocol2::message::backend::ReadyForQueryBody; + pub use crate::cancel_token::CancelToken; pub use crate::client::{Client, SocketConfig}; pub use crate::config::Config; @@ -17,7 +19,6 @@ pub use crate::tls::NoTls; pub use crate::transaction::Transaction; pub use crate::transaction_builder::{IsolationLevel, TransactionBuilder}; use crate::types::ToSql; -use postgres_protocol2::message::backend::ReadyForQueryBody; /// After executing a query, the connection will be in one of these states #[derive(Clone, Copy, Debug, PartialEq)] diff --git a/libs/proxy/tokio-postgres2/src/maybe_tls_stream.rs b/libs/proxy/tokio-postgres2/src/maybe_tls_stream.rs index 9a7e248997..4aa838613e 100644 --- a/libs/proxy/tokio-postgres2/src/maybe_tls_stream.rs +++ b/libs/proxy/tokio-postgres2/src/maybe_tls_stream.rs @@ -1,12 +1,14 @@ //! MaybeTlsStream. //! //! Represents a stream that may or may not be encrypted with TLS. -use crate::tls::{ChannelBinding, TlsStream}; use std::io; use std::pin::Pin; use std::task::{Context, Poll}; + use tokio::io::{AsyncRead, AsyncWrite, ReadBuf}; +use crate::tls::{ChannelBinding, TlsStream}; + /// A stream that may or may not be encrypted with TLS. pub enum MaybeTlsStream { /// An unencrypted stream. diff --git a/libs/proxy/tokio-postgres2/src/prepare.rs b/libs/proxy/tokio-postgres2/src/prepare.rs index 58bbb26cbc..b36d2e5f74 100644 --- a/libs/proxy/tokio-postgres2/src/prepare.rs +++ b/libs/proxy/tokio-postgres2/src/prepare.rs @@ -1,18 +1,19 @@ +use std::future::Future; +use std::pin::Pin; +use std::sync::Arc; + +use bytes::Bytes; +use fallible_iterator::FallibleIterator; +use futures_util::{TryStreamExt, pin_mut}; +use log::debug; +use postgres_protocol2::message::backend::Message; +use postgres_protocol2::message::frontend; + use crate::client::InnerClient; use crate::codec::FrontendMessage; use crate::connection::RequestMessages; use crate::types::{Field, Kind, Oid, Type}; -use crate::{query, slice_iter}; -use crate::{Column, Error, Statement}; -use bytes::Bytes; -use fallible_iterator::FallibleIterator; -use futures_util::{pin_mut, TryStreamExt}; -use log::debug; -use postgres_protocol2::message::backend::Message; -use postgres_protocol2::message::frontend; -use std::future::Future; -use std::pin::Pin; -use std::sync::Arc; +use crate::{Column, Error, Statement, query, slice_iter}; pub(crate) const TYPEINFO_QUERY: &str = "\ SELECT t.typname, t.typtype, t.typelem, r.rngsubtype, t.typbasetype, n.nspname, t.typrelid diff --git a/libs/proxy/tokio-postgres2/src/query.rs b/libs/proxy/tokio-postgres2/src/query.rs index e21631c85d..29f05fba79 100644 --- a/libs/proxy/tokio-postgres2/src/query.rs +++ b/libs/proxy/tokio-postgres2/src/query.rs @@ -1,22 +1,24 @@ -use crate::client::{InnerClient, Responses}; -use crate::codec::FrontendMessage; -use crate::connection::RequestMessages; -use crate::types::IsNull; -use crate::{Column, Error, ReadyForQueryStatus, Row, Statement}; -use bytes::{BufMut, Bytes, BytesMut}; -use fallible_iterator::FallibleIterator; -use futures_util::{ready, Stream}; -use log::{debug, log_enabled, Level}; -use pin_project_lite::pin_project; -use postgres_protocol2::message::backend::Message; -use postgres_protocol2::message::frontend; -use postgres_types2::{Format, ToSql, Type}; use std::fmt; use std::marker::PhantomPinned; use std::pin::Pin; use std::sync::Arc; use std::task::{Context, Poll}; +use bytes::{BufMut, Bytes, BytesMut}; +use fallible_iterator::FallibleIterator; +use futures_util::{Stream, ready}; +use log::{Level, debug, log_enabled}; +use pin_project_lite::pin_project; +use postgres_protocol2::message::backend::Message; +use postgres_protocol2::message::frontend; +use postgres_types2::{Format, ToSql, Type}; + +use crate::client::{InnerClient, Responses}; +use crate::codec::FrontendMessage; +use crate::connection::RequestMessages; +use crate::types::IsNull; +use crate::{Column, Error, ReadyForQueryStatus, Row, Statement}; + struct BorrowToSqlParamsDebug<'a>(&'a [&'a (dyn ToSql + Sync)]); impl fmt::Debug for BorrowToSqlParamsDebug<'_> { @@ -257,7 +259,7 @@ impl Stream for RowStream { this.statement.clone(), body, *this.output_format, - )?))) + )?))); } Message::EmptyQueryResponse | Message::PortalSuspended => {} Message::CommandComplete(body) => { diff --git a/libs/proxy/tokio-postgres2/src/row.rs b/libs/proxy/tokio-postgres2/src/row.rs index 10e130707d..5fc955eef4 100644 --- a/libs/proxy/tokio-postgres2/src/row.rs +++ b/libs/proxy/tokio-postgres2/src/row.rs @@ -1,17 +1,18 @@ //! Rows. +use std::ops::Range; +use std::sync::Arc; +use std::{fmt, str}; + +use fallible_iterator::FallibleIterator; +use postgres_protocol2::message::backend::DataRowBody; +use postgres_types2::{Format, WrongFormat}; + use crate::row::sealed::{AsName, Sealed}; use crate::simple_query::SimpleColumn; use crate::statement::Column; use crate::types::{FromSql, Type, WrongType}; use crate::{Error, Statement}; -use fallible_iterator::FallibleIterator; -use postgres_protocol2::message::backend::DataRowBody; -use postgres_types2::{Format, WrongFormat}; -use std::fmt; -use std::ops::Range; -use std::str; -use std::sync::Arc; mod sealed { pub trait Sealed {} diff --git a/libs/proxy/tokio-postgres2/src/simple_query.rs b/libs/proxy/tokio-postgres2/src/simple_query.rs index fb2550377b..f13d63983f 100644 --- a/libs/proxy/tokio-postgres2/src/simple_query.rs +++ b/libs/proxy/tokio-postgres2/src/simple_query.rs @@ -1,19 +1,21 @@ -use crate::client::{InnerClient, Responses}; -use crate::codec::FrontendMessage; -use crate::connection::RequestMessages; -use crate::{Error, ReadyForQueryStatus, SimpleQueryMessage, SimpleQueryRow}; -use bytes::Bytes; -use fallible_iterator::FallibleIterator; -use futures_util::{ready, Stream}; -use log::debug; -use pin_project_lite::pin_project; -use postgres_protocol2::message::backend::Message; -use postgres_protocol2::message::frontend; use std::marker::PhantomPinned; use std::pin::Pin; use std::sync::Arc; use std::task::{Context, Poll}; +use bytes::Bytes; +use fallible_iterator::FallibleIterator; +use futures_util::{Stream, ready}; +use log::debug; +use pin_project_lite::pin_project; +use postgres_protocol2::message::backend::Message; +use postgres_protocol2::message::frontend; + +use crate::client::{InnerClient, Responses}; +use crate::codec::FrontendMessage; +use crate::connection::RequestMessages; +use crate::{Error, ReadyForQueryStatus, SimpleQueryMessage, SimpleQueryRow}; + /// Information about a column of a single query row. #[derive(Debug)] pub struct SimpleColumn { diff --git a/libs/proxy/tokio-postgres2/src/statement.rs b/libs/proxy/tokio-postgres2/src/statement.rs index 591872fbc5..e4828db712 100644 --- a/libs/proxy/tokio-postgres2/src/statement.rs +++ b/libs/proxy/tokio-postgres2/src/statement.rs @@ -1,15 +1,14 @@ +use std::fmt; +use std::sync::{Arc, Weak}; + +use postgres_protocol2::Oid; +use postgres_protocol2::message::backend::Field; +use postgres_protocol2::message::frontend; + use crate::client::InnerClient; use crate::codec::FrontendMessage; use crate::connection::RequestMessages; use crate::types::Type; -use postgres_protocol2::{ - message::{backend::Field, frontend}, - Oid, -}; -use std::{ - fmt, - sync::{Arc, Weak}, -}; struct StatementInner { client: Weak, diff --git a/libs/proxy/tokio-postgres2/src/tls.rs b/libs/proxy/tokio-postgres2/src/tls.rs index dc8140719f..41b51368ff 100644 --- a/libs/proxy/tokio-postgres2/src/tls.rs +++ b/libs/proxy/tokio-postgres2/src/tls.rs @@ -5,6 +5,7 @@ use std::future::Future; use std::pin::Pin; use std::task::{Context, Poll}; use std::{fmt, io}; + use tokio::io::{AsyncRead, AsyncWrite, ReadBuf}; pub(crate) mod private { diff --git a/libs/proxy/tokio-postgres2/src/transaction.rs b/libs/proxy/tokio-postgres2/src/transaction.rs index 03a57e4947..eecbfc5873 100644 --- a/libs/proxy/tokio-postgres2/src/transaction.rs +++ b/libs/proxy/tokio-postgres2/src/transaction.rs @@ -1,8 +1,9 @@ +use postgres_protocol2::message::frontend; + use crate::codec::FrontendMessage; use crate::connection::RequestMessages; use crate::query::RowStream; use crate::{CancelToken, Client, Error, ReadyForQueryStatus}; -use postgres_protocol2::message::frontend; /// A representation of a PostgreSQL database transaction. /// diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index 7285697040..b9b8e32753 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -2097,9 +2097,8 @@ impl PageServerHandler { set_tracing_field_shard_id(&timeline); if timeline.is_archived() == Some(true) { - // TODO after a grace period, turn this log line into a hard error - tracing::warn!("timeline {tenant_id}/{timeline_id} is archived, but got basebackup request for it."); - //return Err(QueryError::NotFound("timeline is archived".into())) + tracing::info!("timeline {tenant_id}/{timeline_id} is archived, but got basebackup request for it."); + return Err(QueryError::NotFound("timeline is archived".into())); } let latest_gc_cutoff_lsn = timeline.get_applied_gc_cutoff_lsn(); diff --git a/pageserver/src/tenant.rs b/pageserver/src/tenant.rs index efb35625f2..56718f5294 100644 --- a/pageserver/src/tenant.rs +++ b/pageserver/src/tenant.rs @@ -1189,6 +1189,39 @@ impl Tenant { format!("Failed to load layermap for timeline {tenant_id}/{timeline_id}") })?; + // When unarchiving, we've mostly likely lost the heatmap generated prior + // to the archival operation. To allow warming this timeline up, generate + // a previous heatmap which contains all visible layers in the layer map. + // This previous heatmap will be used whenever a fresh heatmap is generated + // for the timeline. + if matches!(cause, LoadTimelineCause::Unoffload) { + let mut tline_ending_at = Some((&timeline, timeline.get_last_record_lsn())); + while let Some((tline, end_lsn)) = tline_ending_at { + let unarchival_heatmap = tline.generate_unarchival_heatmap(end_lsn).await; + if !tline.is_previous_heatmap_active() { + tline + .previous_heatmap + .store(Some(Arc::new(unarchival_heatmap))); + } else { + tracing::info!("Previous heatmap still active. Dropping unarchival heatmap.") + } + + match tline.ancestor_timeline() { + Some(ancestor) => { + if ancestor.update_layer_visibility().await.is_err() { + // Ancestor timeline is shutting down. + break; + } + + tline_ending_at = Some((ancestor, tline.get_ancestor_lsn())); + } + None => { + tline_ending_at = None; + } + } + } + } + match import_pgdata { Some(import_pgdata) if !import_pgdata.is_done() => { match cause { diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index 30de4d90dc..319c5e3d87 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -468,7 +468,7 @@ pub struct Timeline { /// If Some, collects GetPage metadata for an ongoing PageTrace. pub(crate) page_trace: ArcSwapOption>, - previous_heatmap: ArcSwapOption, + pub(super) previous_heatmap: ArcSwapOption, /// May host a background Tokio task which downloads all the layers from the current /// heatmap on demand. @@ -3524,6 +3524,14 @@ impl Timeline { Ok(layer) } + pub(super) fn is_previous_heatmap_active(&self) -> bool { + self.previous_heatmap + .load() + .as_ref() + .map(|prev| matches!(**prev, PreviousHeatmap::Active { .. })) + .unwrap_or(false) + } + /// The timeline heatmap is a hint to secondary locations from the primary location, /// indicating which layers are currently on-disk on the primary. /// @@ -3596,6 +3604,7 @@ impl Timeline { Some(non_resident) => { let mut non_resident = non_resident.peekable(); if non_resident.peek().is_none() { + tracing::info!(timeline_id=%self.timeline_id, "Previous heatmap now obsolete"); self.previous_heatmap .store(Some(PreviousHeatmap::Obsolete.into())); } @@ -3627,6 +3636,36 @@ impl Timeline { Some(HeatMapTimeline::new(self.timeline_id, layers)) } + pub(super) async fn generate_unarchival_heatmap(&self, end_lsn: Lsn) -> PreviousHeatmap { + let guard = self.layers.read().await; + + let now = SystemTime::now(); + let mut heatmap_layers = Vec::default(); + for vl in guard.visible_layers() { + if vl.layer_desc().get_lsn_range().start >= end_lsn { + continue; + } + + let hl = HeatMapLayer { + name: vl.layer_desc().layer_name(), + metadata: vl.metadata(), + access_time: now, + }; + heatmap_layers.push(hl); + } + + tracing::info!( + "Generating unarchival heatmap with {} layers", + heatmap_layers.len() + ); + + let heatmap = HeatMapTimeline::new(self.timeline_id, heatmap_layers); + PreviousHeatmap::Active { + heatmap, + read_at: Instant::now(), + } + } + /// Returns true if the given lsn is or was an ancestor branchpoint. pub(crate) fn is_ancestor_lsn(&self, lsn: Lsn) -> bool { // upon timeline detach, we set the ancestor_lsn to Lsn::INVALID and the store the original diff --git a/pageserver/src/tenant/timeline/compaction.rs b/pageserver/src/tenant/timeline/compaction.rs index 0361ce8cd1..d75591bd74 100644 --- a/pageserver/src/tenant/timeline/compaction.rs +++ b/pageserver/src/tenant/timeline/compaction.rs @@ -1020,7 +1020,7 @@ impl Timeline { /// /// The result may be used as an input to eviction and secondary downloads to de-prioritize layers /// that we know won't be needed for reads. - pub(super) async fn update_layer_visibility( + pub(crate) async fn update_layer_visibility( &self, ) -> Result<(), super::layer_manager::Shutdown> { let head_lsn = self.get_last_record_lsn(); diff --git a/pageserver/src/tenant/timeline/layer_manager.rs b/pageserver/src/tenant/timeline/layer_manager.rs index cb7783d779..60e36a5d4d 100644 --- a/pageserver/src/tenant/timeline/layer_manager.rs +++ b/pageserver/src/tenant/timeline/layer_manager.rs @@ -15,8 +15,8 @@ use crate::{ tenant::{ layer_map::{BatchedUpdates, LayerMap}, storage_layer::{ - AsLayerDesc, InMemoryLayer, Layer, PersistentLayerDesc, PersistentLayerKey, - ResidentLayer, + AsLayerDesc, InMemoryLayer, Layer, LayerVisibilityHint, PersistentLayerDesc, + PersistentLayerKey, ResidentLayer, }, }, }; @@ -118,6 +118,12 @@ impl LayerManager { self.layers().values().filter(|l| l.is_likely_resident()) } + pub(crate) fn visible_layers(&self) -> impl Iterator + '_ { + self.layers() + .values() + .filter(|l| l.visibility() == LayerVisibilityHint::Visible) + } + pub(crate) fn contains(&self, layer: &Layer) -> bool { self.contains_key(&layer.layer_desc().key()) } diff --git a/pgxn/neon/libpagestore.c b/pgxn/neon/libpagestore.c index fc1aecd340..f5801b379b 100644 --- a/pgxn/neon/libpagestore.c +++ b/pgxn/neon/libpagestore.c @@ -716,20 +716,21 @@ retry: if (ret == 0) { - WaitEvent event; + WaitEvent occurred_event; + int noccurred; long timeout; timeout = Max(0, LOG_INTERVAL_MS - INSTR_TIME_GET_MILLISEC(since_last_log)); /* Sleep until there's something to do */ - (void) WaitEventSetWait(shard->wes_read, timeout, &event, 1, - WAIT_EVENT_NEON_PS_READ); + noccurred = WaitEventSetWait(shard->wes_read, timeout, &occurred_event, 1, + WAIT_EVENT_NEON_PS_READ); ResetLatch(MyLatch); CHECK_FOR_INTERRUPTS(); /* Data available in socket? */ - if (event.events & WL_SOCKET_READABLE) + if (noccurred > 0 && (occurred_event.events & WL_SOCKET_READABLE) != 0) { if (!PQconsumeInput(pageserver_conn)) { diff --git a/poetry.lock b/poetry.lock index d66c3aae7a..ba3b0535e4 100644 --- a/poetry.lock +++ b/poetry.lock @@ -1,4 +1,4 @@ -# This file is automatically @generated by Poetry 2.0.1 and should not be changed by hand. +# This file is automatically @generated by Poetry 2.1.1 and should not be changed by hand. [[package]] name = "aiohappyeyeballs" @@ -122,7 +122,7 @@ multidict = ">=4.5,<7.0" yarl = ">=1.12.0,<2.0" [package.extras] -speedups = ["Brotli", "aiodns (>=3.2.0)", "brotlicffi"] +speedups = ["Brotli ; platform_python_implementation == \"CPython\"", "aiodns (>=3.2.0) ; sys_platform == \"linux\" or sys_platform == \"darwin\"", "brotlicffi ; platform_python_implementation != \"CPython\""] [[package]] name = "aiopg" @@ -160,30 +160,30 @@ frozenlist = ">=1.1.0" [[package]] name = "allure-pytest" -version = "2.13.2" +version = "2.13.5" description = "Allure pytest integration" optional = false python-versions = "*" groups = ["main"] files = [ - {file = "allure-pytest-2.13.2.tar.gz", hash = "sha256:22243159e8ec81ce2b5254b4013802198821b1b42f118f69d4a289396607c7b3"}, - {file = "allure_pytest-2.13.2-py3-none-any.whl", hash = "sha256:17de9dbee7f61c8e66a5b5e818b00e419dbcea44cb55c24319401ba813220690"}, + {file = "allure-pytest-2.13.5.tar.gz", hash = "sha256:0ef8e1790c44a988db6b83c4d4f5e91451e2c4c8ea10601dfa88528d23afcf6e"}, + {file = "allure_pytest-2.13.5-py3-none-any.whl", hash = "sha256:94130bac32964b78058e62cf4b815ad97a5ac82a065e6dd2d43abac2be7640fc"}, ] [package.dependencies] -allure-python-commons = "2.13.2" +allure-python-commons = "2.13.5" pytest = ">=4.5.0" [[package]] name = "allure-python-commons" -version = "2.13.2" -description = "Common module for integrate allure with python-based frameworks" +version = "2.13.5" +description = "('Contains the API for end users as well as helper functions and classes to build Allure adapters for Python test frameworks',)" optional = false python-versions = ">=3.6" groups = ["main"] files = [ - {file = "allure-python-commons-2.13.2.tar.gz", hash = "sha256:8a03681330231b1deadd86b97ff68841c6591320114ae638570f1ed60d7a2033"}, - {file = "allure_python_commons-2.13.2-py3-none-any.whl", hash = "sha256:2bb3646ec3fbf5b36d178a5e735002bc130ae9f9ba80f080af97d368ba375051"}, + {file = "allure-python-commons-2.13.5.tar.gz", hash = "sha256:a232e7955811f988e49a4c1dd6c16cce7e9b81d0ea0422b1e5654d3254e2caf3"}, + {file = "allure_python_commons-2.13.5-py3-none-any.whl", hash = "sha256:8b0e837b6e32d810adec563f49e1d04127a5b6770e0232065b7cb09b9953980d"}, ] [package.dependencies] @@ -232,7 +232,7 @@ sniffio = ">=1.1" [package.extras] doc = ["Sphinx (>=7)", "packaging", "sphinx-autodoc-typehints (>=1.2.0)", "sphinx-rtd-theme"] -test = ["anyio[trio]", "coverage[toml] (>=7)", "exceptiongroup (>=1.2.0)", "hypothesis (>=4.0)", "psutil (>=5.9)", "pytest (>=7.0)", "pytest-mock (>=3.6.1)", "trustme", "uvloop (>=0.17)"] +test = ["anyio[trio]", "coverage[toml] (>=7)", "exceptiongroup (>=1.2.0)", "hypothesis (>=4.0)", "psutil (>=5.9)", "pytest (>=7.0)", "pytest-mock (>=3.6.1)", "trustme", "uvloop (>=0.17) ; platform_python_implementation == \"CPython\" and platform_system != \"Windows\""] trio = ["trio (>=0.23)"] [[package]] @@ -308,8 +308,8 @@ files = [ [package.extras] docs = ["Sphinx (>=8.1.3,<8.2.0)", "sphinx-rtd-theme (>=1.2.2)"] -gssauth = ["gssapi", "sspilib"] -test = ["distro (>=1.9.0,<1.10.0)", "flake8 (>=6.1,<7.0)", "flake8-pyi (>=24.1.0,<24.2.0)", "gssapi", "k5test", "mypy (>=1.8.0,<1.9.0)", "sspilib", "uvloop (>=0.15.3)"] +gssauth = ["gssapi ; platform_system != \"Windows\"", "sspilib ; platform_system == \"Windows\""] +test = ["distro (>=1.9.0,<1.10.0)", "flake8 (>=6.1,<7.0)", "flake8-pyi (>=24.1.0,<24.2.0)", "gssapi ; platform_system == \"Linux\"", "k5test ; platform_system == \"Linux\"", "mypy (>=1.8.0,<1.9.0)", "sspilib ; platform_system == \"Windows\"", "uvloop (>=0.15.3) ; platform_system != \"Windows\" and python_version < \"3.14.0\""] [[package]] name = "attrs" @@ -324,10 +324,10 @@ files = [ ] [package.extras] -dev = ["cloudpickle", "coverage[toml] (>=5.0.2)", "furo", "hypothesis", "mypy", "pre-commit", "pympler", "pytest (>=4.3.0)", "pytest-mypy-plugins", "six", "sphinx", "sphinx-notfound-page", "zope.interface"] +dev = ["cloudpickle ; platform_python_implementation == \"CPython\"", "coverage[toml] (>=5.0.2)", "furo", "hypothesis", "mypy", "pre-commit", "pympler", "pytest (>=4.3.0)", "pytest-mypy-plugins", "six", "sphinx", "sphinx-notfound-page", "zope.interface"] docs = ["furo", "sphinx", "sphinx-notfound-page", "zope.interface"] -tests = ["cloudpickle", "coverage[toml] (>=5.0.2)", "hypothesis", "mypy", "pympler", "pytest (>=4.3.0)", "pytest-mypy-plugins", "six", "zope.interface"] -tests-no-zope = ["cloudpickle", "coverage[toml] (>=5.0.2)", "hypothesis", "mypy", "pympler", "pytest (>=4.3.0)", "pytest-mypy-plugins", "six"] +tests = ["cloudpickle ; platform_python_implementation == \"CPython\"", "coverage[toml] (>=5.0.2)", "hypothesis", "mypy", "pympler", "pytest (>=4.3.0)", "pytest-mypy-plugins", "six", "zope.interface"] +tests-no-zope = ["cloudpickle ; platform_python_implementation == \"CPython\"", "coverage[toml] (>=5.0.2)", "hypothesis", "mypy", "pympler", "pytest (>=4.3.0)", "pytest-mypy-plugins", "six"] [[package]] name = "aws-sam-translator" @@ -1074,10 +1074,10 @@ files = [ cffi = {version = ">=1.12", markers = "platform_python_implementation != \"PyPy\""} [package.extras] -docs = ["sphinx (>=5.3.0)", "sphinx-rtd-theme (>=3.0.0)"] +docs = ["sphinx (>=5.3.0)", "sphinx-rtd-theme (>=3.0.0) ; python_version >= \"3.8\""] docstest = ["pyenchant (>=3)", "readme-renderer (>=30.0)", "sphinxcontrib-spelling (>=7.3.1)"] -nox = ["nox (>=2024.4.15)", "nox[uv] (>=2024.3.2)"] -pep8test = ["check-sdist", "click (>=8.0.1)", "mypy (>=1.4)", "ruff (>=0.3.6)"] +nox = ["nox (>=2024.4.15)", "nox[uv] (>=2024.3.2) ; python_version >= \"3.8\""] +pep8test = ["check-sdist ; python_version >= \"3.8\"", "click (>=8.0.1)", "mypy (>=1.4)", "ruff (>=0.3.6)"] sdist = ["build (>=1.0.0)"] ssh = ["bcrypt (>=3.1.5)"] test = ["certifi (>=2024)", "cryptography-vectors (==44.0.1)", "pretend (>=0.7)", "pytest (>=7.4.0)", "pytest-benchmark (>=4.0)", "pytest-cov (>=2.10.1)", "pytest-xdist (>=3.5.0)"] @@ -1359,7 +1359,7 @@ idna = "*" sniffio = "*" [package.extras] -brotli = ["brotli", "brotlicffi"] +brotli = ["brotli ; platform_python_implementation == \"CPython\"", "brotlicffi ; platform_python_implementation != \"CPython\""] cli = ["click (==8.*)", "pygments (==2.*)", "rich (>=10,<14)"] http2 = ["h2 (>=3,<5)"] socks = ["socksio (==1.*)"] @@ -1545,8 +1545,8 @@ files = [ [package.extras] docs = ["jaraco.packaging (>=3.2)", "rst.linker (>=1.9)", "sphinx"] -testing = ["ecdsa", "enum34", "feedparser", "jsonlib", "numpy", "pandas", "pymongo", "pytest (>=3.5,!=3.7.3)", "pytest-black-multipy", "pytest-checkdocs (>=1.2.3)", "pytest-cov", "pytest-flake8 (<1.1.0)", "pytest-flake8 (>=1.1.1)", "scikit-learn", "sqlalchemy"] -testing-libs = ["simplejson", "ujson", "yajl"] +testing = ["ecdsa", "enum34 ; python_version == \"2.7\"", "feedparser", "jsonlib ; python_version == \"2.7\"", "numpy", "pandas", "pymongo", "pytest (>=3.5,!=3.7.3)", "pytest-black-multipy", "pytest-checkdocs (>=1.2.3)", "pytest-cov", "pytest-flake8 (<1.1.0) ; python_version <= \"3.6\"", "pytest-flake8 (>=1.1.1) ; python_version >= \"3.7\"", "scikit-learn", "sqlalchemy"] +testing-libs = ["simplejson", "ujson", "yajl ; python_version == \"2.7\""] [[package]] name = "jsonpointer" @@ -1867,7 +1867,7 @@ files = [ [package.extras] develop = ["codecov", "pycodestyle", "pytest (>=4.6)", "pytest-cov", "wheel"] docs = ["sphinx"] -gmpy = ["gmpy2 (>=2.1.0a4)"] +gmpy = ["gmpy2 (>=2.1.0a4) ; platform_python_implementation != \"PyPy\""] tests = ["pytest (>=4.6)"] [[package]] @@ -2330,7 +2330,7 @@ files = [ ] [package.extras] -test = ["enum34", "ipaddress", "mock", "pywin32", "wmi"] +test = ["enum34 ; python_version <= \"3.4\"", "ipaddress ; python_version < \"3.0\"", "mock ; python_version < \"3.0\"", "pywin32 ; sys_platform == \"win32\"", "wmi ; sys_platform == \"win32\""] [[package]] name = "psycopg2-binary" @@ -2456,7 +2456,7 @@ typing-extensions = ">=4.12.2" [package.extras] email = ["email-validator (>=2.0.0)"] -timezone = ["tzdata"] +timezone = ["tzdata ; python_version >= \"3.9\" and platform_system == \"Windows\""] [[package]] name = "pydantic-core" @@ -3068,7 +3068,7 @@ requests = ">=2.30.0,<3.0" urllib3 = ">=1.25.10,<3.0" [package.extras] -tests = ["coverage (>=6.0.0)", "flake8", "mypy", "pytest (>=7.0.0)", "pytest-asyncio", "pytest-cov", "pytest-httpserver", "tomli", "tomli-w", "types-PyYAML", "types-requests"] +tests = ["coverage (>=6.0.0)", "flake8", "mypy", "pytest (>=7.0.0)", "pytest-asyncio", "pytest-cov", "pytest-httpserver", "tomli ; python_version < \"3.11\"", "tomli-w", "types-PyYAML", "types-requests"] [[package]] name = "rfc3339-validator" @@ -3161,7 +3161,7 @@ files = [ [package.extras] docs = ["furo", "jaraco.packaging (>=9.3)", "jaraco.tidelift (>=1.4)", "pygments-github-lexers (==0.0.5)", "pyproject-hooks (!=1.1)", "rst.linker (>=1.9)", "sphinx (>=3.5)", "sphinx-favicon", "sphinx-inline-tabs", "sphinx-lint", "sphinx-notfound-page (>=1,<2)", "sphinx-reredirects", "sphinxcontrib-towncrier"] -testing = ["build[virtualenv] (>=1.0.3)", "filelock (>=3.4.0)", "importlib-metadata", "ini2toml[lite] (>=0.14)", "jaraco.develop (>=7.21)", "jaraco.envs (>=2.2)", "jaraco.path (>=3.2.0)", "mypy (==1.9)", "packaging (>=23.2)", "pip (>=19.1)", "pyproject-hooks (!=1.1)", "pytest (>=6,!=8.1.1)", "pytest-checkdocs (>=2.4)", "pytest-cov", "pytest-enabler (>=2.2)", "pytest-home (>=0.5)", "pytest-mypy", "pytest-perf", "pytest-ruff (>=0.2.1)", "pytest-subprocess", "pytest-timeout", "pytest-xdist (>=3)", "tomli", "tomli-w (>=1.0.0)", "virtualenv (>=13.0.0)", "wheel"] +testing = ["build[virtualenv] (>=1.0.3)", "filelock (>=3.4.0)", "importlib-metadata", "ini2toml[lite] (>=0.14)", "jaraco.develop (>=7.21) ; python_version >= \"3.9\" and sys_platform != \"cygwin\"", "jaraco.envs (>=2.2)", "jaraco.path (>=3.2.0)", "mypy (==1.9)", "packaging (>=23.2)", "pip (>=19.1)", "pyproject-hooks (!=1.1)", "pytest (>=6,!=8.1.1)", "pytest-checkdocs (>=2.4)", "pytest-cov ; platform_python_implementation != \"PyPy\"", "pytest-enabler (>=2.2)", "pytest-home (>=0.5)", "pytest-mypy", "pytest-perf ; sys_platform != \"cygwin\"", "pytest-ruff (>=0.2.1) ; sys_platform != \"cygwin\"", "pytest-subprocess", "pytest-timeout", "pytest-xdist (>=3)", "tomli", "tomli-w (>=1.0.0)", "virtualenv (>=13.0.0)", "wheel"] [[package]] name = "six" @@ -3407,8 +3407,8 @@ files = [ ] [package.extras] -brotli = ["brotli (==1.0.9)", "brotli (>=1.0.9)", "brotlicffi (>=0.8.0)", "brotlipy (>=0.6.0)"] -secure = ["certifi", "cryptography (>=1.3.4)", "idna (>=2.0.0)", "ipaddress", "pyOpenSSL (>=0.14)", "urllib3-secure-extra"] +brotli = ["brotli (==1.0.9) ; os_name != \"nt\" and python_version < \"3\" and platform_python_implementation == \"CPython\"", "brotli (>=1.0.9) ; python_version >= \"3\" and platform_python_implementation == \"CPython\"", "brotlicffi (>=0.8.0) ; (os_name != \"nt\" or python_version >= \"3\") and platform_python_implementation != \"CPython\"", "brotlipy (>=0.6.0) ; os_name == \"nt\" and python_version < \"3\""] +secure = ["certifi", "cryptography (>=1.3.4)", "idna (>=2.0.0)", "ipaddress ; python_version == \"2.7\"", "pyOpenSSL (>=0.14)", "urllib3-secure-extra"] socks = ["PySocks (>=1.5.6,!=1.5.7,<2.0)"] [[package]] @@ -3820,4 +3820,4 @@ cffi = ["cffi (>=1.11)"] [metadata] lock-version = "2.1" python-versions = "^3.11" -content-hash = "00ddc42c32e235b6171845fc066dcab078282ed832cd464d5e8a0afa959dd04a" +content-hash = "9711c5479c867fa614ce3d352f1bbc63dba1cb2376d347f96fbeda6f512ee308" diff --git a/proxy/Cargo.toml b/proxy/Cargo.toml index 6a381bf094..5964b76ecf 100644 --- a/proxy/Cargo.toml +++ b/proxy/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "proxy" version = "0.1.0" -edition.workspace = true +edition = "2024" license.workspace = true [features] diff --git a/proxy/src/auth/backend/console_redirect.rs b/proxy/src/auth/backend/console_redirect.rs index 7503b4eac9..dd48384c03 100644 --- a/proxy/src/auth/backend/console_redirect.rs +++ b/proxy/src/auth/backend/console_redirect.rs @@ -8,16 +8,16 @@ use tokio::io::{AsyncRead, AsyncWrite}; use tracing::{info, info_span}; use super::ComputeCredentialKeys; -use crate::auth::backend::ComputeUserInfo; use crate::auth::IpPattern; +use crate::auth::backend::ComputeUserInfo; use crate::cache::Cached; use crate::config::AuthenticationConfig; use crate::context::RequestContext; use crate::control_plane::client::cplane_proxy_v1; use crate::control_plane::{self, CachedNodeInfo, NodeInfo}; use crate::error::{ReportableError, UserFacingError}; -use crate::proxy::connect_compute::ComputeConnectBackend; use crate::proxy::NeonOptions; +use crate::proxy::connect_compute::ComputeConnectBackend; use crate::stream::PqStream; use crate::types::RoleName; use crate::{auth, compute, waiters}; diff --git a/proxy/src/auth/backend/jwt.rs b/proxy/src/auth/backend/jwt.rs index 5d032c0deb..942f1e13d1 100644 --- a/proxy/src/auth/backend/jwt.rs +++ b/proxy/src/auth/backend/jwt.rs @@ -6,9 +6,9 @@ use std::time::{Duration, SystemTime}; use arc_swap::ArcSwapOption; use clashmap::ClashMap; use jose_jwk::crypto::KeyInfo; -use reqwest::{redirect, Client}; -use reqwest_retry::policies::ExponentialBackoff; +use reqwest::{Client, redirect}; use reqwest_retry::RetryTransientMiddleware; +use reqwest_retry::policies::ExponentialBackoff; use serde::de::Visitor; use serde::{Deserialize, Deserializer}; use serde_json::value::RawValue; @@ -498,8 +498,8 @@ fn verify_rsa_signature( alg: &jose_jwa::Algorithm, ) -> Result<(), JwtError> { use jose_jwa::{Algorithm, Signing}; - use rsa::pkcs1v15::{Signature, VerifyingKey}; use rsa::RsaPublicKey; + use rsa::pkcs1v15::{Signature, VerifyingKey}; let key = RsaPublicKey::try_from(key).map_err(JwtError::InvalidRsaKey)?; diff --git a/proxy/src/auth/backend/local.rs b/proxy/src/auth/backend/local.rs index d10f0e82b2..9c3a3772cd 100644 --- a/proxy/src/auth/backend/local.rs +++ b/proxy/src/auth/backend/local.rs @@ -8,8 +8,8 @@ use crate::auth::backend::jwt::FetchAuthRulesError; use crate::compute::ConnCfg; use crate::compute_ctl::ComputeCtlApi; use crate::context::RequestContext; -use crate::control_plane::messages::{ColdStartInfo, EndpointJwksResponse, MetricsAuxInfo}; use crate::control_plane::NodeInfo; +use crate::control_plane::messages::{ColdStartInfo, EndpointJwksResponse, MetricsAuxInfo}; use crate::http; use crate::intern::{BranchIdTag, EndpointIdTag, InternId, ProjectIdTag}; use crate::types::EndpointId; diff --git a/proxy/src/auth/backend/mod.rs b/proxy/src/auth/backend/mod.rs index 8f1625278f..83feed5094 100644 --- a/proxy/src/auth/backend/mod.rs +++ b/proxy/src/auth/backend/mod.rs @@ -18,7 +18,7 @@ use tracing::{debug, info, warn}; use crate::auth::credentials::check_peer_addr_is_in_list; use crate::auth::{ - self, validate_password_and_exchange, AuthError, ComputeUserInfoMaybeEndpoint, IpPattern, + self, AuthError, ComputeUserInfoMaybeEndpoint, IpPattern, validate_password_and_exchange, }; use crate::cache::Cached; use crate::config::AuthenticationConfig; @@ -32,8 +32,8 @@ use crate::control_plane::{ use crate::intern::EndpointIdInt; use crate::metrics::Metrics; use crate::protocol2::ConnectionInfoExtra; -use crate::proxy::connect_compute::ComputeConnectBackend; use crate::proxy::NeonOptions; +use crate::proxy::connect_compute::ComputeConnectBackend; use crate::rate_limiter::{BucketRateLimiter, EndpointRateLimiter}; use crate::stream::Stream; use crate::types::{EndpointCacheKey, EndpointId, RoleName}; @@ -542,7 +542,7 @@ mod tests { use tokio::io::{AsyncRead, AsyncReadExt, AsyncWriteExt}; use super::jwt::JwkCache; - use super::{auth_quirks, AuthRateLimiter}; + use super::{AuthRateLimiter, auth_quirks}; use crate::auth::backend::MaskedIp; use crate::auth::{ComputeUserInfoMaybeEndpoint, IpPattern}; use crate::config::AuthenticationConfig; @@ -553,8 +553,8 @@ mod tests { }; use crate::proxy::NeonOptions; use crate::rate_limiter::{EndpointRateLimiter, RateBucketInfo}; - use crate::scram::threadpool::ThreadPool; use crate::scram::ServerSecret; + use crate::scram::threadpool::ThreadPool; use crate::stream::{PqStream, Stream}; struct Auth { diff --git a/proxy/src/auth/credentials.rs b/proxy/src/auth/credentials.rs index eff49a402a..c1b7718e4f 100644 --- a/proxy/src/auth/credentials.rs +++ b/proxy/src/auth/credentials.rs @@ -197,7 +197,10 @@ impl<'de> serde::de::Deserialize<'de> for IpPattern { type Value = IpPattern; fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(formatter, "comma separated list with ip address, ip address range, or ip address subnet mask") + write!( + formatter, + "comma separated list with ip address, ip address range, or ip address subnet mask" + ) } fn visit_str(self, v: &str) -> Result @@ -252,8 +255,8 @@ fn project_name_valid(name: &str) -> bool { #[cfg(test)] #[expect(clippy::unwrap_used)] mod tests { - use serde_json::json; use ComputeUserInfoParseError::*; + use serde_json::json; use super::*; diff --git a/proxy/src/auth/mod.rs b/proxy/src/auth/mod.rs index 6082695a6b..5670f8e43d 100644 --- a/proxy/src/auth/mod.rs +++ b/proxy/src/auth/mod.rs @@ -5,13 +5,13 @@ pub use backend::Backend; mod credentials; pub(crate) use credentials::{ - check_peer_addr_is_in_list, endpoint_sni, ComputeUserInfoMaybeEndpoint, - ComputeUserInfoParseError, IpPattern, + ComputeUserInfoMaybeEndpoint, ComputeUserInfoParseError, IpPattern, check_peer_addr_is_in_list, + endpoint_sni, }; mod password_hack; -pub(crate) use password_hack::parse_endpoint_param; use password_hack::PasswordHackPayload; +pub(crate) use password_hack::parse_endpoint_param; mod flow; use std::io; diff --git a/proxy/src/binary/local_proxy.rs b/proxy/src/binary/local_proxy.rs index 4ab11f828c..dedd225cba 100644 --- a/proxy/src/binary/local_proxy.rs +++ b/proxy/src/binary/local_proxy.rs @@ -4,7 +4,7 @@ use std::str::FromStr; use std::sync::Arc; use std::time::Duration; -use anyhow::{bail, ensure, Context}; +use anyhow::{Context, bail, ensure}; use camino::{Utf8Path, Utf8PathBuf}; use clap::Parser; use compute_api::spec::LocalProxySpec; @@ -19,7 +19,7 @@ use utils::sentry_init::init_sentry; use utils::{pid_file, project_build_tag, project_git_version}; use crate::auth::backend::jwt::JwkCache; -use crate::auth::backend::local::{LocalBackend, JWKS_ROLE_MAP}; +use crate::auth::backend::local::{JWKS_ROLE_MAP, LocalBackend}; use crate::auth::{self}; use crate::cancellation::CancellationHandler; use crate::config::{ diff --git a/proxy/src/binary/pg_sni_router.rs b/proxy/src/binary/pg_sni_router.rs index 94e771a61c..1aa290399c 100644 --- a/proxy/src/binary/pg_sni_router.rs +++ b/proxy/src/binary/pg_sni_router.rs @@ -5,24 +5,24 @@ /// the outside. Similar to an ingress controller for HTTPS. use std::{net::SocketAddr, sync::Arc}; -use anyhow::{anyhow, bail, ensure, Context}; +use anyhow::{Context, anyhow, bail, ensure}; use clap::Arg; -use futures::future::Either; use futures::TryFutureExt; +use futures::future::Either; use itertools::Itertools; use rustls::crypto::ring; use rustls::pki_types::PrivateKeyDer; use tokio::io::{AsyncRead, AsyncWrite}; use tokio::net::TcpListener; use tokio_util::sync::CancellationToken; -use tracing::{error, info, Instrument}; +use tracing::{Instrument, error, info}; use utils::project_git_version; use utils::sentry_init::init_sentry; use crate::context::RequestContext; use crate::metrics::{Metrics, ThreadPoolMetrics}; use crate::protocol2::ConnectionInfo; -use crate::proxy::{copy_bidirectional_client_compute, run_until_cancelled, ErrorSource}; +use crate::proxy::{ErrorSource, copy_bidirectional_client_compute, run_until_cancelled}; use crate::stream::{PqStream, Stream}; use crate::tls::TlsServerEndPoint; diff --git a/proxy/src/binary/proxy.rs b/proxy/src/binary/proxy.rs index b72799df54..eec0bf8f99 100644 --- a/proxy/src/binary/proxy.rs +++ b/proxy/src/binary/proxy.rs @@ -9,16 +9,16 @@ use remote_storage::RemoteStorageConfig; use tokio::net::TcpListener; use tokio::task::JoinSet; use tokio_util::sync::CancellationToken; -use tracing::{info, warn, Instrument}; +use tracing::{Instrument, info, warn}; use utils::sentry_init::init_sentry; use utils::{project_build_tag, project_git_version}; use crate::auth::backend::jwt::JwkCache; use crate::auth::backend::{AuthRateLimiter, ConsoleRedirectBackend, MaybeOwned}; -use crate::cancellation::{handle_cancel_messages, CancellationHandler}; +use crate::cancellation::{CancellationHandler, handle_cancel_messages}; use crate::config::{ - self, remote_storage_from_toml, AuthenticationConfig, CacheOptions, ComputeConfig, HttpConfig, - ProjectInfoCacheOptions, ProxyConfig, ProxyProtocolV2, + self, AuthenticationConfig, CacheOptions, ComputeConfig, HttpConfig, ProjectInfoCacheOptions, + ProxyConfig, ProxyProtocolV2, remote_storage_from_toml, }; use crate::context::parquet::ParquetUploadArgs; use crate::http::health_server::AppMetrics; @@ -30,8 +30,8 @@ use crate::redis::connection_with_credentials_provider::ConnectionWithCredential use crate::redis::kv_ops::RedisKVClient; use crate::redis::{elasticache, notifications}; use crate::scram::threadpool::ThreadPool; -use crate::serverless::cancel_set::CancelSet; use crate::serverless::GlobalConnPoolOptions; +use crate::serverless::cancel_set::CancelSet; use crate::tls::client_config::compute_client_config_with_root_certs; use crate::{auth, control_plane, http, serverless, usage_metrics}; @@ -331,7 +331,9 @@ pub async fn run() -> anyhow::Result<()> { ), ), (None, None) => { - warn!("irsa auth requires redis-host and redis-port to be set, continuing without regional_redis_client"); + warn!( + "irsa auth requires redis-host and redis-port to be set, continuing without regional_redis_client" + ); None } _ => { diff --git a/proxy/src/cache/project_info.rs b/proxy/src/cache/project_info.rs index 7651eb71a2..e153e9f61f 100644 --- a/proxy/src/cache/project_info.rs +++ b/proxy/src/cache/project_info.rs @@ -1,12 +1,12 @@ use std::collections::HashSet; use std::convert::Infallible; -use std::sync::atomic::AtomicU64; use std::sync::Arc; +use std::sync::atomic::AtomicU64; use std::time::Duration; use async_trait::async_trait; use clashmap::ClashMap; -use rand::{thread_rng, Rng}; +use rand::{Rng, thread_rng}; use smol_str::SmolStr; use tokio::sync::Mutex; use tokio::time::Instant; diff --git a/proxy/src/cache/timed_lru.rs b/proxy/src/cache/timed_lru.rs index 06eaeb9a30..7cfe5100ea 100644 --- a/proxy/src/cache/timed_lru.rs +++ b/proxy/src/cache/timed_lru.rs @@ -11,11 +11,11 @@ use std::time::{Duration, Instant}; // This severely hinders its usage both in terms of creating wrappers and supported key types. // // On the other hand, `hashlink` has good download stats and appears to be maintained. -use hashlink::{linked_hash_map::RawEntryMut, LruCache}; +use hashlink::{LruCache, linked_hash_map::RawEntryMut}; use tracing::debug; use super::common::Cached; -use super::{timed_lru, Cache}; +use super::{Cache, timed_lru}; /// An implementation of timed LRU cache with fixed capacity. /// Key properties: diff --git a/proxy/src/cancellation.rs b/proxy/src/cancellation.rs index 422e6f741d..8263e5aa2a 100644 --- a/proxy/src/cancellation.rs +++ b/proxy/src/cancellation.rs @@ -3,8 +3,8 @@ use std::net::{IpAddr, SocketAddr}; use std::sync::Arc; use ipnet::{IpNet, Ipv4Net, Ipv6Net}; -use postgres_client::tls::MakeTlsConnect; use postgres_client::CancelToken; +use postgres_client::tls::MakeTlsConnect; use pq_proto::CancelKeyData; use serde::{Deserialize, Serialize}; use thiserror::Error; @@ -13,7 +13,7 @@ use tokio::sync::{mpsc, oneshot}; use tracing::{debug, info}; use crate::auth::backend::ComputeUserInfo; -use crate::auth::{check_peer_addr_is_in_list, AuthError}; +use crate::auth::{AuthError, check_peer_addr_is_in_list}; use crate::config::ComputeConfig; use crate::context::RequestContext; use crate::control_plane::ControlPlaneApi; diff --git a/proxy/src/config.rs b/proxy/src/config.rs index 460e0cff54..1bcd22e98f 100644 --- a/proxy/src/config.rs +++ b/proxy/src/config.rs @@ -2,18 +2,18 @@ use std::str::FromStr; use std::sync::Arc; use std::time::Duration; -use anyhow::{bail, ensure, Context, Ok}; +use anyhow::{Context, Ok, bail, ensure}; use clap::ValueEnum; use remote_storage::RemoteStorageConfig; -use crate::auth::backend::jwt::JwkCache; use crate::auth::backend::AuthRateLimiter; +use crate::auth::backend::jwt::JwkCache; use crate::control_plane::locks::ApiLocks; use crate::rate_limiter::{RateBucketInfo, RateLimitAlgorithm, RateLimiterConfig}; use crate::scram::threadpool::ThreadPool; -use crate::serverless::cancel_set::CancelSet; use crate::serverless::GlobalConnPoolOptions; -pub use crate::tls::server_config::{configure_tls, TlsConfig}; +use crate::serverless::cancel_set::CancelSet; +pub use crate::tls::server_config::{TlsConfig, configure_tls}; use crate::types::Host; pub struct ProxyConfig { @@ -97,8 +97,7 @@ pub struct EndpointCacheConfig { impl EndpointCacheConfig { /// Default options for [`crate::control_plane::NodeInfoCache`]. /// Notice that by default the limiter is empty, which means that cache is disabled. - pub const CACHE_DEFAULT_OPTIONS: &'static str = - "initial_batch_size=1000,default_batch_size=10,xread_timeout=5m,stream_name=controlPlane,disable_cache=true,limiter_info=1000@1s,retry_interval=1s"; + pub const CACHE_DEFAULT_OPTIONS: &'static str = "initial_batch_size=1000,default_batch_size=10,xread_timeout=5m,stream_name=controlPlane,disable_cache=true,limiter_info=1000@1s,retry_interval=1s"; /// Parse cache options passed via cmdline. /// Example: [`Self::CACHE_DEFAULT_OPTIONS`]. diff --git a/proxy/src/console_redirect_proxy.rs b/proxy/src/console_redirect_proxy.rs index a2e7299d39..4662860b3f 100644 --- a/proxy/src/console_redirect_proxy.rs +++ b/proxy/src/console_redirect_proxy.rs @@ -3,7 +3,7 @@ use std::sync::Arc; use futures::{FutureExt, TryFutureExt}; use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt}; use tokio_util::sync::CancellationToken; -use tracing::{debug, error, info, Instrument}; +use tracing::{Instrument, debug, error, info}; use crate::auth::backend::ConsoleRedirectBackend; use crate::cancellation::CancellationHandler; @@ -11,12 +11,12 @@ use crate::config::{ProxyConfig, ProxyProtocolV2}; use crate::context::RequestContext; use crate::error::ReportableError; use crate::metrics::{Metrics, NumClientConnectionsGuard}; -use crate::protocol2::{read_proxy_protocol, ConnectHeader, ConnectionInfo}; -use crate::proxy::connect_compute::{connect_to_compute, TcpMechanism}; -use crate::proxy::handshake::{handshake, HandshakeData}; +use crate::protocol2::{ConnectHeader, ConnectionInfo, read_proxy_protocol}; +use crate::proxy::connect_compute::{TcpMechanism, connect_to_compute}; +use crate::proxy::handshake::{HandshakeData, handshake}; use crate::proxy::passthrough::ProxyPassthrough; use crate::proxy::{ - prepare_client_connection, run_until_cancelled, ClientRequestError, ErrorSource, + ClientRequestError, ErrorSource, prepare_client_connection, run_until_cancelled, }; pub async fn task_main( @@ -64,22 +64,34 @@ pub async fn task_main( debug!("healthcheck received"); return; } - Ok((_socket, ConnectHeader::Missing)) if config.proxy_protocol_v2 == ProxyProtocolV2::Required => { + Ok((_socket, ConnectHeader::Missing)) + if config.proxy_protocol_v2 == ProxyProtocolV2::Required => + { error!("missing required proxy protocol header"); return; } - Ok((_socket, ConnectHeader::Proxy(_))) if config.proxy_protocol_v2 == ProxyProtocolV2::Rejected => { + Ok((_socket, ConnectHeader::Proxy(_))) + if config.proxy_protocol_v2 == ProxyProtocolV2::Rejected => + { error!("proxy protocol header not supported"); return; } Ok((socket, ConnectHeader::Proxy(info))) => (socket, info), - Ok((socket, ConnectHeader::Missing)) => (socket, ConnectionInfo{ addr: peer_addr, extra: None }), + Ok((socket, ConnectHeader::Missing)) => ( + socket, + ConnectionInfo { + addr: peer_addr, + extra: None, + }, + ), }; match socket.inner.set_nodelay(true) { Ok(()) => {} Err(e) => { - error!("per-client task finished with an error: failed to set socket option: {e:#}"); + error!( + "per-client task finished with an error: failed to set socket option: {e:#}" + ); return; } } @@ -118,10 +130,16 @@ pub async fn task_main( match p.proxy_pass(&config.connect_to_compute).await { Ok(()) => {} Err(ErrorSource::Client(e)) => { - error!(?session_id, "per-client task finished with an IO error from the client: {e:#}"); + error!( + ?session_id, + "per-client task finished with an IO error from the client: {e:#}" + ); } Err(ErrorSource::Compute(e)) => { - error!(?session_id, "per-client task finished with an IO error from the compute: {e:#}"); + error!( + ?session_id, + "per-client task finished with an IO error from the compute: {e:#}" + ); } } } diff --git a/proxy/src/context/mod.rs b/proxy/src/context/mod.rs index 3236b2e1bf..74b48a1bea 100644 --- a/proxy/src/context/mod.rs +++ b/proxy/src/context/mod.rs @@ -8,7 +8,7 @@ use pq_proto::StartupMessageParams; use smol_str::SmolStr; use tokio::sync::mpsc; use tracing::field::display; -use tracing::{debug, error, info_span, Span}; +use tracing::{Span, debug, error, info_span}; use try_lock::TryLock; use uuid::Uuid; diff --git a/proxy/src/context/parquet.rs b/proxy/src/context/parquet.rs index 0537ae6a62..f029327266 100644 --- a/proxy/src/context/parquet.rs +++ b/proxy/src/context/parquet.rs @@ -8,7 +8,7 @@ use chrono::{Datelike, Timelike}; use futures::{Stream, StreamExt}; use parquet::basic::Compression; use parquet::file::metadata::RowGroupMetaDataPtr; -use parquet::file::properties::{WriterProperties, WriterPropertiesPtr, DEFAULT_PAGE_SIZE}; +use parquet::file::properties::{DEFAULT_PAGE_SIZE, WriterProperties, WriterPropertiesPtr}; use parquet::file::writer::SerializedFileWriter; use parquet::record::RecordWriter; use pq_proto::StartupMessageParams; @@ -17,10 +17,10 @@ use serde::ser::SerializeMap; use tokio::sync::mpsc; use tokio::time; use tokio_util::sync::CancellationToken; -use tracing::{debug, info, Span}; +use tracing::{Span, debug, info}; use utils::backoff; -use super::{RequestContextInner, LOG_CHAN}; +use super::{LOG_CHAN, RequestContextInner}; use crate::config::remote_storage_from_toml; use crate::context::LOG_CHAN_DISCONNECT; use crate::ext::TaskExt; @@ -425,20 +425,20 @@ mod tests { use futures::{Stream, StreamExt}; use itertools::Itertools; use parquet::basic::{Compression, ZstdLevel}; - use parquet::file::properties::{WriterProperties, DEFAULT_PAGE_SIZE}; + use parquet::file::properties::{DEFAULT_PAGE_SIZE, WriterProperties}; use parquet::file::reader::FileReader; use parquet::file::serialized_reader::SerializedFileReader; use rand::rngs::StdRng; use rand::{Rng, SeedableRng}; use remote_storage::{ - GenericRemoteStorage, RemoteStorageConfig, RemoteStorageKind, S3Config, DEFAULT_MAX_KEYS_PER_LIST_RESPONSE, DEFAULT_REMOTE_STORAGE_S3_CONCURRENCY_LIMIT, + GenericRemoteStorage, RemoteStorageConfig, RemoteStorageKind, S3Config, }; use tokio::sync::mpsc; use tokio::time; use walkdir::WalkDir; - use super::{worker_inner, ParquetConfig, ParquetUploadArgs, RequestData}; + use super::{ParquetConfig, ParquetUploadArgs, RequestData, worker_inner}; #[derive(Parser)] struct ProxyCliArgs { @@ -514,26 +514,26 @@ mod tests { fn generate_request_data(rng: &mut impl Rng) -> RequestData { RequestData { - session_id: uuid::Builder::from_random_bytes(rng.gen()).into_uuid(), - peer_addr: Ipv4Addr::from(rng.gen::<[u8; 4]>()).to_string(), + session_id: uuid::Builder::from_random_bytes(rng.r#gen()).into_uuid(), + peer_addr: Ipv4Addr::from(rng.r#gen::<[u8; 4]>()).to_string(), timestamp: chrono::DateTime::from_timestamp_millis( rng.gen_range(1703862754..1803862754), ) .unwrap() .naive_utc(), application_name: Some("test".to_owned()), - username: Some(hex::encode(rng.gen::<[u8; 4]>())), - endpoint_id: Some(hex::encode(rng.gen::<[u8; 16]>())), - database: Some(hex::encode(rng.gen::<[u8; 16]>())), - project: Some(hex::encode(rng.gen::<[u8; 16]>())), - branch: Some(hex::encode(rng.gen::<[u8; 16]>())), + username: Some(hex::encode(rng.r#gen::<[u8; 4]>())), + endpoint_id: Some(hex::encode(rng.r#gen::<[u8; 16]>())), + database: Some(hex::encode(rng.r#gen::<[u8; 16]>())), + project: Some(hex::encode(rng.r#gen::<[u8; 16]>())), + branch: Some(hex::encode(rng.r#gen::<[u8; 16]>())), pg_options: None, auth_method: None, jwt_issuer: None, protocol: ["tcp", "ws", "http"][rng.gen_range(0..3)], region: "us-east-1", error: None, - success: rng.gen(), + success: rng.r#gen(), cold_start_info: "no", duration_us: rng.gen_range(0..30_000_000), disconnect_timestamp: None, diff --git a/proxy/src/control_plane/client/cplane_proxy_v1.rs b/proxy/src/control_plane/client/cplane_proxy_v1.rs index ef6621fc59..977fcf4727 100644 --- a/proxy/src/control_plane/client/cplane_proxy_v1.rs +++ b/proxy/src/control_plane/client/cplane_proxy_v1.rs @@ -3,16 +3,16 @@ use std::sync::Arc; use std::time::Duration; -use ::http::header::AUTHORIZATION; use ::http::HeaderName; +use ::http::header::AUTHORIZATION; use futures::TryFutureExt; use postgres_client::config::SslMode; use tokio::time::Instant; -use tracing::{debug, info, info_span, warn, Instrument}; +use tracing::{Instrument, debug, info, info_span, warn}; use super::super::messages::{ControlPlaneErrorMessage, GetEndpointAccessControl, WakeCompute}; -use crate::auth::backend::jwt::AuthRule; use crate::auth::backend::ComputeUserInfo; +use crate::auth::backend::jwt::AuthRule; use crate::cache::Cached; use crate::context::RequestContext; use crate::control_plane::caches::ApiCaches; diff --git a/proxy/src/control_plane/client/mock.rs b/proxy/src/control_plane/client/mock.rs index 1e6cde8fb0..7da5464aa5 100644 --- a/proxy/src/control_plane/client/mock.rs +++ b/proxy/src/control_plane/client/mock.rs @@ -6,11 +6,11 @@ use std::sync::Arc; use futures::TryFutureExt; use thiserror::Error; use tokio_postgres::Client; -use tracing::{error, info, info_span, warn, Instrument}; +use tracing::{Instrument, error, info, info_span, warn}; -use crate::auth::backend::jwt::AuthRule; -use crate::auth::backend::ComputeUserInfo; use crate::auth::IpPattern; +use crate::auth::backend::ComputeUserInfo; +use crate::auth::backend::jwt::AuthRule; use crate::cache::Cached; use crate::context::RequestContext; use crate::control_plane::client::{ diff --git a/proxy/src/control_plane/client/mod.rs b/proxy/src/control_plane/client/mod.rs index c28ff4789d..746595de38 100644 --- a/proxy/src/control_plane/client/mod.rs +++ b/proxy/src/control_plane/client/mod.rs @@ -10,15 +10,15 @@ use clashmap::ClashMap; use tokio::time::Instant; use tracing::{debug, info}; -use crate::auth::backend::jwt::{AuthRule, FetchAuthRules, FetchAuthRulesError}; use crate::auth::backend::ComputeUserInfo; +use crate::auth::backend::jwt::{AuthRule, FetchAuthRules, FetchAuthRulesError}; use crate::cache::endpoints::EndpointsCache; use crate::cache::project_info::ProjectInfoCacheImpl; use crate::config::{CacheOptions, EndpointCacheConfig, ProjectInfoCacheOptions}; use crate::context::RequestContext; use crate::control_plane::{ - errors, CachedAccessBlockerFlags, CachedAllowedIps, CachedAllowedVpcEndpointIds, - CachedNodeInfo, CachedRoleSecret, ControlPlaneApi, NodeInfoCache, + CachedAccessBlockerFlags, CachedAllowedIps, CachedAllowedVpcEndpointIds, CachedNodeInfo, + CachedRoleSecret, ControlPlaneApi, NodeInfoCache, errors, }; use crate::error::ReportableError; use crate::metrics::ApiLockMetrics; diff --git a/proxy/src/control_plane/errors.rs b/proxy/src/control_plane/errors.rs index d6f565e34a..bc30cffd27 100644 --- a/proxy/src/control_plane/errors.rs +++ b/proxy/src/control_plane/errors.rs @@ -2,7 +2,7 @@ use thiserror::Error; use crate::control_plane::client::ApiLockError; use crate::control_plane::messages::{self, ControlPlaneErrorMessage, Reason}; -use crate::error::{io_error, ErrorKind, ReportableError, UserFacingError}; +use crate::error::{ErrorKind, ReportableError, UserFacingError, io_error}; use crate::proxy::retry::CouldRetry; /// A go-to error message which doesn't leak any detail. diff --git a/proxy/src/control_plane/mgmt.rs b/proxy/src/control_plane/mgmt.rs index 2f7359240d..df31abcc8c 100644 --- a/proxy/src/control_plane/mgmt.rs +++ b/proxy/src/control_plane/mgmt.rs @@ -6,7 +6,7 @@ use postgres_backend::{AuthType, PostgresBackend, PostgresBackendTCP, QueryError use pq_proto::{BeMessage, SINGLE_COL_ROWDESC}; use tokio::net::{TcpListener, TcpStream}; use tokio_util::sync::CancellationToken; -use tracing::{error, info, info_span, Instrument}; +use tracing::{Instrument, error, info, info_span}; use crate::control_plane::messages::{DatabaseInfo, KickSession}; use crate::waiters::{self, Waiter, Waiters}; diff --git a/proxy/src/control_plane/mod.rs b/proxy/src/control_plane/mod.rs index 89ec4f9b33..d592223be1 100644 --- a/proxy/src/control_plane/mod.rs +++ b/proxy/src/control_plane/mod.rs @@ -11,9 +11,9 @@ pub(crate) mod errors; use std::sync::Arc; +use crate::auth::IpPattern; use crate::auth::backend::jwt::AuthRule; use crate::auth::backend::{ComputeCredentialKeys, ComputeUserInfo}; -use crate::auth::IpPattern; use crate::cache::project_info::ProjectInfoCacheImpl; use crate::cache::{Cached, TimedLru}; use crate::config::ComputeConfig; diff --git a/proxy/src/http/health_server.rs b/proxy/src/http/health_server.rs index 141f319567..5278fe2a3e 100644 --- a/proxy/src/http/health_server.rs +++ b/proxy/src/http/health_server.rs @@ -9,8 +9,8 @@ use http_utils::json::json_response; use http_utils::{RouterBuilder, RouterService}; use hyper0::header::CONTENT_TYPE; use hyper0::{Body, Request, Response, StatusCode}; -use measured::text::BufferedTextEncoder; use measured::MetricGroup; +use measured::text::BufferedTextEncoder; use metrics::NeonMetrics; use tracing::{info, info_span}; diff --git a/proxy/src/http/mod.rs b/proxy/src/http/mod.rs index ed88c77256..96f600d836 100644 --- a/proxy/src/http/mod.rs +++ b/proxy/src/http/mod.rs @@ -13,8 +13,8 @@ use hyper::body::Body; pub(crate) use reqwest::{Request, Response}; use reqwest_middleware::RequestBuilder; pub(crate) use reqwest_middleware::{ClientWithMiddleware, Error}; -pub(crate) use reqwest_retry::policies::ExponentialBackoff; pub(crate) use reqwest_retry::RetryTransientMiddleware; +pub(crate) use reqwest_retry::policies::ExponentialBackoff; use thiserror::Error; use crate::metrics::{ConsoleRequest, Metrics}; diff --git a/proxy/src/logging.rs b/proxy/src/logging.rs index fbd4811b54..3c34918d84 100644 --- a/proxy/src/logging.rs +++ b/proxy/src/logging.rs @@ -8,7 +8,7 @@ use opentelemetry::trace::TraceContextExt; use scopeguard::defer; use serde::ser::{SerializeMap, Serializer}; use tracing::subscriber::Interest; -use tracing::{callsite, span, Event, Metadata, Span, Subscriber}; +use tracing::{Event, Metadata, Span, Subscriber, callsite, span}; use tracing_opentelemetry::OpenTelemetrySpanExt; use tracing_subscriber::filter::{EnvFilter, LevelFilter}; use tracing_subscriber::fmt::format::{Format, Full}; diff --git a/proxy/src/metrics.rs b/proxy/src/metrics.rs index f3447e063e..db1f096de1 100644 --- a/proxy/src/metrics.rs +++ b/proxy/src/metrics.rs @@ -543,11 +543,7 @@ impl Drop for LatencyTimer { impl From for Bool { fn from(value: bool) -> Self { - if value { - Bool::True - } else { - Bool::False - } + if value { Bool::True } else { Bool::False } } } diff --git a/proxy/src/protocol2.rs b/proxy/src/protocol2.rs index 99d645878f..41180fa6c1 100644 --- a/proxy/src/protocol2.rs +++ b/proxy/src/protocol2.rs @@ -407,7 +407,7 @@ mod tests { use tokio::io::AsyncReadExt; use crate::protocol2::{ - read_proxy_protocol, ConnectHeader, LOCAL_V2, PROXY_V2, TCP_OVER_IPV4, UDP_OVER_IPV6, + ConnectHeader, LOCAL_V2, PROXY_V2, TCP_OVER_IPV4, UDP_OVER_IPV6, read_proxy_protocol, }; #[tokio::test] diff --git a/proxy/src/proxy/connect_compute.rs b/proxy/src/proxy/connect_compute.rs index 26fb1754bf..b8b39fa121 100644 --- a/proxy/src/proxy/connect_compute.rs +++ b/proxy/src/proxy/connect_compute.rs @@ -5,7 +5,7 @@ use tracing::{debug, info, warn}; use super::retry::ShouldRetryWakeCompute; use crate::auth::backend::{ComputeCredentialKeys, ComputeUserInfo}; -use crate::compute::{self, PostgresConnection, COULD_NOT_CONNECT}; +use crate::compute::{self, COULD_NOT_CONNECT, PostgresConnection}; use crate::config::{ComputeConfig, RetryConfig}; use crate::context::RequestContext; use crate::control_plane::errors::WakeComputeError; @@ -15,7 +15,7 @@ use crate::error::ReportableError; use crate::metrics::{ ConnectOutcome, ConnectionFailureKind, Metrics, RetriesMetricGroup, RetryType, }; -use crate::proxy::retry::{retry_after, should_retry, CouldRetry}; +use crate::proxy::retry::{CouldRetry, retry_after, should_retry}; use crate::proxy::wake_compute::wake_compute; use crate::types::Host; diff --git a/proxy/src/proxy/copy_bidirectional.rs b/proxy/src/proxy/copy_bidirectional.rs index 861f1766e8..6f8b972348 100644 --- a/proxy/src/proxy/copy_bidirectional.rs +++ b/proxy/src/proxy/copy_bidirectional.rs @@ -1,7 +1,7 @@ use std::future::poll_fn; use std::io; use std::pin::Pin; -use std::task::{ready, Context, Poll}; +use std::task::{Context, Poll, ready}; use tokio::io::{AsyncRead, AsyncWrite, ReadBuf}; use tracing::info; diff --git a/proxy/src/proxy/mod.rs b/proxy/src/proxy/mod.rs index 49566e5172..0c6d352600 100644 --- a/proxy/src/proxy/mod.rs +++ b/proxy/src/proxy/mod.rs @@ -9,28 +9,28 @@ pub(crate) mod retry; pub(crate) mod wake_compute; use std::sync::Arc; -pub use copy_bidirectional::{copy_bidirectional_client_compute, ErrorSource}; +pub use copy_bidirectional::{ErrorSource, copy_bidirectional_client_compute}; use futures::{FutureExt, TryFutureExt}; use itertools::Itertools; use once_cell::sync::OnceCell; use pq_proto::{BeMessage as Be, CancelKeyData, StartupMessageParams}; use regex::Regex; use serde::{Deserialize, Serialize}; -use smol_str::{format_smolstr, SmolStr, ToSmolStr}; +use smol_str::{SmolStr, ToSmolStr, format_smolstr}; use thiserror::Error; use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt}; use tokio_util::sync::CancellationToken; -use tracing::{debug, error, info, warn, Instrument}; +use tracing::{Instrument, debug, error, info, warn}; -use self::connect_compute::{connect_to_compute, TcpMechanism}; +use self::connect_compute::{TcpMechanism, connect_to_compute}; use self::passthrough::ProxyPassthrough; use crate::cancellation::{self, CancellationHandler}; use crate::config::{ProxyConfig, ProxyProtocolV2, TlsConfig}; use crate::context::RequestContext; use crate::error::ReportableError; use crate::metrics::{Metrics, NumClientConnectionsGuard}; -use crate::protocol2::{read_proxy_protocol, ConnectHeader, ConnectionInfo, ConnectionInfoExtra}; -use crate::proxy::handshake::{handshake, HandshakeData}; +use crate::protocol2::{ConnectHeader, ConnectionInfo, ConnectionInfoExtra, read_proxy_protocol}; +use crate::proxy::handshake::{HandshakeData, handshake}; use crate::rate_limiter::EndpointRateLimiter; use crate::stream::{PqStream, Stream}; use crate::types::EndpointCacheKey; diff --git a/proxy/src/proxy/tests/mod.rs b/proxy/src/proxy/tests/mod.rs index d8c00a9b41..171f539b1e 100644 --- a/proxy/src/proxy/tests/mod.rs +++ b/proxy/src/proxy/tests/mod.rs @@ -5,12 +5,12 @@ mod mitm; use std::time::Duration; -use anyhow::{bail, Context}; +use anyhow::{Context, bail}; use async_trait::async_trait; use http::StatusCode; use postgres_client::config::SslMode; use postgres_client::tls::{MakeTlsConnect, NoTls}; -use retry::{retry_after, ShouldRetryWakeCompute}; +use retry::{ShouldRetryWakeCompute, retry_after}; use rstest::rstest; use rustls::crypto::ring; use rustls::pki_types; @@ -334,8 +334,8 @@ async fn scram_auth_mock() -> anyhow::Result<()> { generate_tls_config("generic-project-name.localhost", "localhost")?; let proxy = tokio::spawn(dummy_proxy(client, Some(server_config), Scram::mock())); - use rand::distributions::Alphanumeric; use rand::Rng; + use rand::distributions::Alphanumeric; let password: String = rand::thread_rng() .sample_iter(&Alphanumeric) .take(rand::random::() as usize) diff --git a/proxy/src/proxy/wake_compute.rs b/proxy/src/proxy/wake_compute.rs index 4e9206feff..9d8915e24a 100644 --- a/proxy/src/proxy/wake_compute.rs +++ b/proxy/src/proxy/wake_compute.rs @@ -3,8 +3,8 @@ use tracing::{error, info}; use super::connect_compute::ComputeConnectBackend; use crate::config::RetryConfig; use crate::context::RequestContext; -use crate::control_plane::errors::{ControlPlaneError, WakeComputeError}; use crate::control_plane::CachedNodeInfo; +use crate::control_plane::errors::{ControlPlaneError, WakeComputeError}; use crate::error::ReportableError; use crate::metrics::{ ConnectOutcome, ConnectionFailuresBreakdownGroup, Metrics, RetriesMetricGroup, RetryType, diff --git a/proxy/src/rate_limiter/leaky_bucket.rs b/proxy/src/rate_limiter/leaky_bucket.rs index 9645eaf725..b3853d48e4 100644 --- a/proxy/src/rate_limiter/leaky_bucket.rs +++ b/proxy/src/rate_limiter/leaky_bucket.rs @@ -3,7 +3,7 @@ use std::sync::atomic::{AtomicUsize, Ordering}; use ahash::RandomState; use clashmap::ClashMap; -use rand::{thread_rng, Rng}; +use rand::{Rng, thread_rng}; use tokio::time::Instant; use tracing::info; use utils::leaky_bucket::LeakyBucketState; diff --git a/proxy/src/rate_limiter/limit_algorithm.rs b/proxy/src/rate_limiter/limit_algorithm.rs index b74a9ab17e..f8eeb89f05 100644 --- a/proxy/src/rate_limiter/limit_algorithm.rs +++ b/proxy/src/rate_limiter/limit_algorithm.rs @@ -5,8 +5,8 @@ use std::time::Duration; use parking_lot::Mutex; use tokio::sync::Notify; -use tokio::time::error::Elapsed; use tokio::time::Instant; +use tokio::time::error::Elapsed; use self::aimd::Aimd; diff --git a/proxy/src/rate_limiter/limiter.rs b/proxy/src/rate_limiter/limiter.rs index ef6c39f230..71e2a92da6 100644 --- a/proxy/src/rate_limiter/limiter.rs +++ b/proxy/src/rate_limiter/limiter.rs @@ -1,8 +1,8 @@ use std::borrow::Cow; use std::collections::hash_map::RandomState; use std::hash::{BuildHasher, Hash}; -use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Mutex; +use std::sync::atomic::{AtomicUsize, Ordering}; use anyhow::bail; use clashmap::ClashMap; diff --git a/proxy/src/redis/elasticache.rs b/proxy/src/redis/elasticache.rs index bf6dde9332..58e3c889a7 100644 --- a/proxy/src/redis/elasticache.rs +++ b/proxy/src/redis/elasticache.rs @@ -1,6 +1,7 @@ use std::sync::Arc; use std::time::{Duration, SystemTime}; +use aws_config::Region; use aws_config::environment::EnvironmentVariableCredentialsProvider; use aws_config::imds::credentials::ImdsCredentialsProvider; use aws_config::meta::credentials::CredentialsProviderChain; @@ -8,7 +9,6 @@ use aws_config::meta::region::RegionProviderChain; use aws_config::profile::ProfileFileCredentialsProvider; use aws_config::provider_config::ProviderConfig; use aws_config::web_identity_token::WebIdentityTokenCredentialsProvider; -use aws_config::Region; use aws_sdk_iam::config::ProvideCredentials; use aws_sigv4::http_request::{ self, SignableBody, SignableRequest, SignatureLocation, SigningSettings, diff --git a/proxy/src/redis/keys.rs b/proxy/src/redis/keys.rs index dcb9a59f87..7527bca6d0 100644 --- a/proxy/src/redis/keys.rs +++ b/proxy/src/redis/keys.rs @@ -1,7 +1,7 @@ use std::io::ErrorKind; use anyhow::Ok; -use pq_proto::{id_to_cancel_key, CancelKeyData}; +use pq_proto::{CancelKeyData, id_to_cancel_key}; use serde::{Deserialize, Serialize}; pub mod keyspace { diff --git a/proxy/src/sasl/stream.rs b/proxy/src/sasl/stream.rs index ac77556566..46e6a439e5 100644 --- a/proxy/src/sasl/stream.rs +++ b/proxy/src/sasl/stream.rs @@ -5,8 +5,8 @@ use std::io; use tokio::io::{AsyncRead, AsyncWrite}; use tracing::info; -use super::messages::ServerMessage; use super::Mechanism; +use super::messages::ServerMessage; use crate::stream::PqStream; /// Abstracts away all peculiarities of the libpq's protocol. diff --git a/proxy/src/scram/countmin.rs b/proxy/src/scram/countmin.rs index 87ab6e0d5f..9d56c465ec 100644 --- a/proxy/src/scram/countmin.rs +++ b/proxy/src/scram/countmin.rs @@ -90,7 +90,7 @@ mod tests { // number of insert operations let m = rng.gen_range(1..100); - let id = uuid::Builder::from_random_bytes(rng.gen()).into_uuid(); + let id = uuid::Builder::from_random_bytes(rng.r#gen()).into_uuid(); ids.push((id, n, m)); // N = sum(actual) diff --git a/proxy/src/scram/exchange.rs b/proxy/src/scram/exchange.rs index 77853db3db..abd5aeae5b 100644 --- a/proxy/src/scram/exchange.rs +++ b/proxy/src/scram/exchange.rs @@ -5,6 +5,7 @@ use std::convert::Infallible; use hmac::{Hmac, Mac}; use sha2::Sha256; +use super::ScramKey; use super::messages::{ ClientFinalMessage, ClientFirstMessage, OwnedServerFirstMessage, SCRAM_RAW_NONCE_LEN, }; @@ -12,7 +13,6 @@ use super::pbkdf2::Pbkdf2; use super::secret::ServerSecret; use super::signature::SignatureBuilder; use super::threadpool::ThreadPool; -use super::ScramKey; use crate::intern::EndpointIdInt; use crate::sasl::{self, ChannelBinding, Error as SaslError}; @@ -208,8 +208,8 @@ impl sasl::Mechanism for Exchange<'_> { type Output = super::ScramKey; fn exchange(mut self, input: &str) -> sasl::Result> { - use sasl::Step; use ExchangeState; + use sasl::Step; match &self.state { ExchangeState::Initial(init) => { match init.transition(self.secret, &self.tls_server_end_point, input)? { diff --git a/proxy/src/scram/messages.rs b/proxy/src/scram/messages.rs index 0e54e7ded9..7b0b861ce9 100644 --- a/proxy/src/scram/messages.rs +++ b/proxy/src/scram/messages.rs @@ -4,7 +4,7 @@ use std::fmt; use std::ops::Range; use super::base64_decode_array; -use super::key::{ScramKey, SCRAM_KEY_LEN}; +use super::key::{SCRAM_KEY_LEN, ScramKey}; use super::signature::SignatureBuilder; use crate::sasl::ChannelBinding; diff --git a/proxy/src/scram/mod.rs b/proxy/src/scram/mod.rs index cfa571cbe1..24f991d4d9 100644 --- a/proxy/src/scram/mod.rs +++ b/proxy/src/scram/mod.rs @@ -15,7 +15,7 @@ mod secret; mod signature; pub mod threadpool; -pub(crate) use exchange::{exchange, Exchange}; +pub(crate) use exchange::{Exchange, exchange}; use hmac::{Hmac, Mac}; pub(crate) use key::ScramKey; pub(crate) use secret::ServerSecret; diff --git a/proxy/src/scram/signature.rs b/proxy/src/scram/signature.rs index d3255cf2ca..a5b1c3e9f4 100644 --- a/proxy/src/scram/signature.rs +++ b/proxy/src/scram/signature.rs @@ -1,6 +1,6 @@ //! Tools for client/server signature management. -use super::key::{ScramKey, SCRAM_KEY_LEN}; +use super::key::{SCRAM_KEY_LEN, ScramKey}; /// A collection of message parts needed to derive the client's signature. #[derive(Debug)] diff --git a/proxy/src/serverless/backend.rs b/proxy/src/serverless/backend.rs index 70dd7bc0e7..72029102e0 100644 --- a/proxy/src/serverless/backend.rs +++ b/proxy/src/serverless/backend.rs @@ -7,27 +7,27 @@ use ed25519_dalek::SigningKey; use hyper_util::rt::{TokioExecutor, TokioIo, TokioTimer}; use jose_jwk::jose_b64; use rand::rngs::OsRng; -use tokio::net::{lookup_host, TcpStream}; +use tokio::net::{TcpStream, lookup_host}; use tracing::field::display; use tracing::{debug, info}; use super::conn_pool::poll_client; use super::conn_pool_lib::{Client, ConnInfo, EndpointConnPool, GlobalConnPool}; -use super::http_conn_pool::{self, poll_http2_client, HttpConnPool, Send}; -use super::local_conn_pool::{self, LocalConnPool, EXT_NAME, EXT_SCHEMA, EXT_VERSION}; +use super::http_conn_pool::{self, HttpConnPool, Send, poll_http2_client}; +use super::local_conn_pool::{self, EXT_NAME, EXT_SCHEMA, EXT_VERSION, LocalConnPool}; use crate::auth::backend::local::StaticAuthRules; use crate::auth::backend::{ComputeCredentials, ComputeUserInfo}; -use crate::auth::{self, check_peer_addr_is_in_list, AuthError}; +use crate::auth::{self, AuthError, check_peer_addr_is_in_list}; use crate::compute; use crate::compute_ctl::{ ComputeCtlError, ExtensionInstallRequest, Privilege, SetRoleGrantsRequest, }; use crate::config::{ComputeConfig, ProxyConfig}; use crate::context::RequestContext; +use crate::control_plane::CachedNodeInfo; use crate::control_plane::client::ApiLockError; use crate::control_plane::errors::{GetAuthInfoError, WakeComputeError}; use crate::control_plane::locks::ApiLocks; -use crate::control_plane::CachedNodeInfo; use crate::error::{ErrorKind, ReportableError, UserFacingError}; use crate::intern::EndpointIdInt; use crate::protocol2::ConnectionInfoExtra; diff --git a/proxy/src/serverless/cancel_set.rs b/proxy/src/serverless/cancel_set.rs index 6db986f1f7..ba8945afc5 100644 --- a/proxy/src/serverless/cancel_set.rs +++ b/proxy/src/serverless/cancel_set.rs @@ -6,7 +6,7 @@ use std::time::Duration; use indexmap::IndexMap; use parking_lot::Mutex; -use rand::{thread_rng, Rng}; +use rand::{Rng, thread_rng}; use rustc_hash::FxHasher; use tokio::time::Instant; use tokio_util::sync::CancellationToken; @@ -40,7 +40,7 @@ impl CancelSet { pub(crate) fn take(&self) -> Option { for _ in 0..4 { - if let Some(token) = self.take_raw(thread_rng().gen()) { + if let Some(token) = self.take_raw(thread_rng().r#gen()) { return Some(token); } tracing::trace!("failed to get cancel token"); @@ -68,7 +68,7 @@ impl CancelShard { fn take(&mut self, rng: usize) -> Option { NonZeroUsize::new(self.tokens.len()).and_then(|len| { // 10 second grace period so we don't cancel new connections - if self.tokens.get_index(rng % len)?.1 .0.elapsed() < Duration::from_secs(10) { + if self.tokens.get_index(rng % len)?.1.0.elapsed() < Duration::from_secs(10) { return None; } diff --git a/proxy/src/serverless/conn_pool.rs b/proxy/src/serverless/conn_pool.rs index 447103edce..6a9089fc2a 100644 --- a/proxy/src/serverless/conn_pool.rs +++ b/proxy/src/serverless/conn_pool.rs @@ -1,17 +1,17 @@ use std::fmt; use std::pin::pin; use std::sync::{Arc, Weak}; -use std::task::{ready, Poll}; +use std::task::{Poll, ready}; -use futures::future::poll_fn; use futures::Future; -use postgres_client::tls::NoTlsStream; +use futures::future::poll_fn; use postgres_client::AsyncMessage; +use postgres_client::tls::NoTlsStream; use smallvec::SmallVec; use tokio::net::TcpStream; use tokio::time::Instant; use tokio_util::sync::CancellationToken; -use tracing::{error, info, info_span, warn, Instrument}; +use tracing::{Instrument, error, info, info_span, warn}; #[cfg(test)] use { super::conn_pool_lib::GlobalConnPoolOptions, diff --git a/proxy/src/serverless/conn_pool_lib.rs b/proxy/src/serverless/conn_pool_lib.rs index 9e21491655..933204994b 100644 --- a/proxy/src/serverless/conn_pool_lib.rs +++ b/proxy/src/serverless/conn_pool_lib.rs @@ -10,7 +10,7 @@ use parking_lot::RwLock; use postgres_client::ReadyForQueryStatus; use rand::Rng; use smol_str::ToSmolStr; -use tracing::{debug, info, Span}; +use tracing::{Span, debug, info}; use super::backend::HttpConnError; use super::conn_pool::ClientDataRemote; diff --git a/proxy/src/serverless/http_conn_pool.rs b/proxy/src/serverless/http_conn_pool.rs index fa21f24a1c..338a79b4b3 100644 --- a/proxy/src/serverless/http_conn_pool.rs +++ b/proxy/src/serverless/http_conn_pool.rs @@ -7,7 +7,7 @@ use hyper_util::rt::{TokioExecutor, TokioIo}; use parking_lot::RwLock; use smol_str::ToSmolStr; use tokio::net::TcpStream; -use tracing::{debug, error, info, info_span, Instrument}; +use tracing::{Instrument, debug, error, info, info_span}; use super::backend::HttpConnError; use super::conn_pool_lib::{ diff --git a/proxy/src/serverless/json.rs b/proxy/src/serverless/json.rs index ab012bd020..fbd12ad9cb 100644 --- a/proxy/src/serverless/json.rs +++ b/proxy/src/serverless/json.rs @@ -1,5 +1,5 @@ -use postgres_client::types::{Kind, Type}; use postgres_client::Row; +use postgres_client::types::{Kind, Type}; use serde_json::{Map, Value}; // diff --git a/proxy/src/serverless/local_conn_pool.rs b/proxy/src/serverless/local_conn_pool.rs index 137a2d6377..8426a0810e 100644 --- a/proxy/src/serverless/local_conn_pool.rs +++ b/proxy/src/serverless/local_conn_pool.rs @@ -11,24 +11,24 @@ use std::collections::HashMap; use std::pin::pin; -use std::sync::atomic::AtomicUsize; use std::sync::Arc; -use std::task::{ready, Poll}; +use std::sync::atomic::AtomicUsize; +use std::task::{Poll, ready}; use std::time::Duration; use ed25519_dalek::{Signature, Signer, SigningKey}; -use futures::future::poll_fn; use futures::Future; +use futures::future::poll_fn; use indexmap::IndexMap; use jose_jwk::jose_b64::base64ct::{Base64UrlUnpadded, Encoding}; use parking_lot::RwLock; -use postgres_client::tls::NoTlsStream; use postgres_client::AsyncMessage; +use postgres_client::tls::NoTlsStream; use serde_json::value::RawValue; use tokio::net::TcpStream; use tokio::time::Instant; use tokio_util::sync::CancellationToken; -use tracing::{debug, error, info, info_span, warn, Instrument}; +use tracing::{Instrument, debug, error, info, info_span, warn}; use super::backend::HttpConnError; use super::conn_pool_lib::{ @@ -389,6 +389,9 @@ mod tests { // }); // println!("{}", serde_json::to_string(&jwk).unwrap()); - assert_eq!(jwt, "eyJhbGciOiJFZERTQSJ9.eyJmb28iOiJiYXIiLCJqdGkiOjIsIm5lc3RlZCI6eyJqdGkiOiJ0cmlja3kgbmVzdGluZyJ9fQ.Cvyc2By33KI0f0obystwdy8PN111L3Sc9_Mr2CU3XshtSqSdxuRxNEZGbb_RvyJf2IzheC_s7aBZ-jLeQ9N0Bg"); + assert_eq!( + jwt, + "eyJhbGciOiJFZERTQSJ9.eyJmb28iOiJiYXIiLCJqdGkiOjIsIm5lc3RlZCI6eyJqdGkiOiJ0cmlja3kgbmVzdGluZyJ9fQ.Cvyc2By33KI0f0obystwdy8PN111L3Sc9_Mr2CU3XshtSqSdxuRxNEZGbb_RvyJf2IzheC_s7aBZ-jLeQ9N0Bg" + ); } } diff --git a/proxy/src/serverless/mod.rs b/proxy/src/serverless/mod.rs index 8289500159..dd0fb9c5b4 100644 --- a/proxy/src/serverless/mod.rs +++ b/proxy/src/serverless/mod.rs @@ -15,7 +15,7 @@ mod sql_over_http; mod websocket; use std::net::{IpAddr, SocketAddr}; -use std::pin::{pin, Pin}; +use std::pin::{Pin, pin}; use std::sync::Arc; use anyhow::Context; @@ -23,8 +23,8 @@ use async_trait::async_trait; use atomic_take::AtomicTake; use bytes::Bytes; pub use conn_pool_lib::GlobalConnPoolOptions; -use futures::future::{select, Either}; use futures::TryFutureExt; +use futures::future::{Either, select}; use http::{Method, Response, StatusCode}; use http_body_util::combinators::BoxBody; use http_body_util::{BodyExt, Empty}; @@ -32,23 +32,23 @@ use http_utils::error::ApiError; use hyper::body::Incoming; use hyper_util::rt::TokioExecutor; use hyper_util::server::conn::auto::Builder; -use rand::rngs::StdRng; use rand::SeedableRng; -use sql_over_http::{uuid_to_header_value, NEON_REQUEST_ID}; +use rand::rngs::StdRng; +use sql_over_http::{NEON_REQUEST_ID, uuid_to_header_value}; use tokio::io::{AsyncRead, AsyncWrite}; use tokio::net::{TcpListener, TcpStream}; use tokio::time::timeout; use tokio_rustls::TlsAcceptor; use tokio_util::sync::CancellationToken; use tokio_util::task::TaskTracker; -use tracing::{info, warn, Instrument}; +use tracing::{Instrument, info, warn}; use crate::cancellation::CancellationHandler; use crate::config::{ProxyConfig, ProxyProtocolV2}; use crate::context::RequestContext; use crate::ext::TaskExt; use crate::metrics::Metrics; -use crate::protocol2::{read_proxy_protocol, ChainRW, ConnectHeader, ConnectionInfo}; +use crate::protocol2::{ChainRW, ConnectHeader, ConnectionInfo, read_proxy_protocol}; use crate::proxy::run_until_cancelled; use crate::rate_limiter::EndpointRateLimiter; use crate::serverless::backend::PoolingBackend; diff --git a/proxy/src/serverless/sql_over_http.rs b/proxy/src/serverless/sql_over_http.rs index 7c21d90ed8..8babfb5cd2 100644 --- a/proxy/src/serverless/sql_over_http.rs +++ b/proxy/src/serverless/sql_over_http.rs @@ -2,23 +2,23 @@ use std::pin::pin; use std::sync::Arc; use bytes::Bytes; -use futures::future::{select, try_join, Either}; +use futures::future::{Either, select, try_join}; use futures::{StreamExt, TryFutureExt}; -use http::header::AUTHORIZATION; use http::Method; +use http::header::AUTHORIZATION; use http_body_util::combinators::BoxBody; use http_body_util::{BodyExt, Full}; use http_utils::error::ApiError; use hyper::body::Incoming; use hyper::http::{HeaderName, HeaderValue}; -use hyper::{header, HeaderMap, Request, Response, StatusCode}; +use hyper::{HeaderMap, Request, Response, StatusCode, header}; use indexmap::IndexMap; use postgres_client::error::{DbError, ErrorPosition, SqlState}; use postgres_client::{GenericClient, IsolationLevel, NoTls, ReadyForQueryStatus, Transaction}; use pq_proto::StartupMessageParamsBuilder; use serde::Serialize; -use serde_json::value::RawValue; use serde_json::Value; +use serde_json::value::RawValue; use tokio::time::{self, Instant}; use tokio_util::sync::CancellationToken; use tracing::{debug, error, info}; @@ -31,15 +31,15 @@ use super::conn_pool::{AuthData, ConnInfoWithAuth}; use super::conn_pool_lib::{self, ConnInfo}; use super::error::HttpCodeError; use super::http_util::json_response; -use super::json::{json_to_pg_text, pg_text_row_to_json, JsonConversionError}; +use super::json::{JsonConversionError, json_to_pg_text, pg_text_row_to_json}; use crate::auth::backend::{ComputeCredentialKeys, ComputeUserInfo}; -use crate::auth::{endpoint_sni, ComputeUserInfoParseError}; +use crate::auth::{ComputeUserInfoParseError, endpoint_sni}; use crate::config::{AuthenticationConfig, HttpConfig, ProxyConfig, TlsConfig}; use crate::context::RequestContext; use crate::error::{ErrorKind, ReportableError, UserFacingError}; -use crate::http::{read_body_with_limit, ReadBodyError}; +use crate::http::{ReadBodyError, read_body_with_limit}; use crate::metrics::{HttpDirection, Metrics}; -use crate::proxy::{run_until_cancelled, NeonOptions}; +use crate::proxy::{NeonOptions, run_until_cancelled}; use crate::serverless::backend::HttpConnError; use crate::types::{DbName, RoleName}; use crate::usage_metrics::{MetricCounter, MetricCounterRecorder, TrafficDirection}; @@ -1021,7 +1021,7 @@ async fn query_to_json( data: QueryData, current_size: &mut usize, parsed_headers: HttpHeaders, -) -> Result<(ReadyForQueryStatus, impl Serialize), SqlOverHttpError> { +) -> Result<(ReadyForQueryStatus, impl Serialize + use), SqlOverHttpError> { let query_start = Instant::now(); let query_params = data.params; diff --git a/proxy/src/serverless/websocket.rs b/proxy/src/serverless/websocket.rs index 585a7d63b2..c4baeeb5cc 100644 --- a/proxy/src/serverless/websocket.rs +++ b/proxy/src/serverless/websocket.rs @@ -1,6 +1,6 @@ use std::pin::Pin; use std::sync::Arc; -use std::task::{ready, Context, Poll}; +use std::task::{Context, Poll, ready}; use anyhow::Context as _; use bytes::{Buf, BufMut, Bytes, BytesMut}; @@ -15,9 +15,9 @@ use tracing::warn; use crate::cancellation::CancellationHandler; use crate::config::ProxyConfig; use crate::context::RequestContext; -use crate::error::{io_error, ReportableError}; +use crate::error::{ReportableError, io_error}; use crate::metrics::Metrics; -use crate::proxy::{handle_client, ClientMode, ErrorSource}; +use crate::proxy::{ClientMode, ErrorSource, handle_client}; use crate::rate_limiter::EndpointRateLimiter; pin_project! { @@ -184,11 +184,11 @@ mod tests { use framed_websockets::WebSocketServer; use futures::{SinkExt, StreamExt}; - use tokio::io::{duplex, AsyncReadExt, AsyncWriteExt}; + use tokio::io::{AsyncReadExt, AsyncWriteExt, duplex}; use tokio::task::JoinSet; - use tokio_tungstenite::tungstenite::protocol::Role; - use tokio_tungstenite::tungstenite::Message; use tokio_tungstenite::WebSocketStream; + use tokio_tungstenite::tungstenite::Message; + use tokio_tungstenite::tungstenite::protocol::Role; use super::WebSocketRw; diff --git a/proxy/src/signals.rs b/proxy/src/signals.rs index 0b675683c0..32b2344a1c 100644 --- a/proxy/src/signals.rs +++ b/proxy/src/signals.rs @@ -12,7 +12,7 @@ pub async fn handle( where F: FnMut(), { - use tokio::signal::unix::{signal, SignalKind}; + use tokio::signal::unix::{SignalKind, signal}; let mut hangup = signal(SignalKind::hangup())?; let mut interrupt = signal(SignalKind::interrupt())?; diff --git a/proxy/src/tls/postgres_rustls.rs b/proxy/src/tls/postgres_rustls.rs index 0ad279b635..f09e916a1d 100644 --- a/proxy/src/tls/postgres_rustls.rs +++ b/proxy/src/tls/postgres_rustls.rs @@ -2,8 +2,8 @@ use std::convert::TryFrom; use std::sync::Arc; use postgres_client::tls::MakeTlsConnect; -use rustls::pki_types::ServerName; use rustls::ClientConfig; +use rustls::pki_types::ServerName; use tokio::io::{AsyncRead, AsyncWrite}; mod private { @@ -15,8 +15,8 @@ mod private { use postgres_client::tls::{ChannelBinding, TlsConnect}; use rustls::pki_types::ServerName; use tokio::io::{AsyncRead, AsyncWrite, ReadBuf}; - use tokio_rustls::client::TlsStream; use tokio_rustls::TlsConnector; + use tokio_rustls::client::TlsStream; use crate::tls::TlsServerEndPoint; diff --git a/proxy/src/tls/server_config.rs b/proxy/src/tls/server_config.rs index 2cc1657eea..903c0b712b 100644 --- a/proxy/src/tls/server_config.rs +++ b/proxy/src/tls/server_config.rs @@ -1,12 +1,12 @@ use std::collections::{HashMap, HashSet}; use std::sync::Arc; -use anyhow::{bail, Context}; +use anyhow::{Context, bail}; use itertools::Itertools; use rustls::crypto::ring::{self, sign}; use rustls::pki_types::{CertificateDer, PrivateKeyDer}; -use super::{TlsServerEndPoint, PG_ALPN_PROTOCOL}; +use super::{PG_ALPN_PROTOCOL, TlsServerEndPoint}; pub struct TlsConfig { pub config: Arc, diff --git a/proxy/src/usage_metrics.rs b/proxy/src/usage_metrics.rs index 6a23f0e129..004d268fa1 100644 --- a/proxy/src/usage_metrics.rs +++ b/proxy/src/usage_metrics.rs @@ -2,17 +2,17 @@ //! and push them to a HTTP endpoint. use std::borrow::Cow; use std::convert::Infallible; -use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering}; use std::sync::Arc; +use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering}; use std::time::Duration; -use anyhow::{bail, Context}; +use anyhow::{Context, bail}; use async_compression::tokio::write::GzipEncoder; use bytes::Bytes; use chrono::{DateTime, Datelike, Timelike, Utc}; -use clashmap::mapref::entry::Entry; use clashmap::ClashMap; -use consumption_metrics::{idempotency_key, Event, EventChunk, EventType, CHUNK_SIZE}; +use clashmap::mapref::entry::Entry; +use consumption_metrics::{CHUNK_SIZE, Event, EventChunk, EventType, idempotency_key}; use once_cell::sync::Lazy; use remote_storage::{GenericRemoteStorage, RemotePath, TimeoutOrCancel}; use serde::{Deserialize, Serialize}; @@ -62,11 +62,7 @@ mod none_as_empty_string { d: D, ) -> Result, D::Error> { let s = SmolStr::deserialize(d)?; - if s.is_empty() { - Ok(None) - } else { - Ok(Some(s)) - } + if s.is_empty() { Ok(None) } else { Ok(Some(s)) } } } diff --git a/pyproject.toml b/pyproject.toml index 92a660c233..c6e5073bcd 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -25,7 +25,7 @@ prometheus-client = "^0.14.1" pytest-timeout = "^2.3.1" Werkzeug = "^3.0.6" pytest-order = "^1.1.0" -allure-pytest = "^2.13.2" +allure-pytest = "^2.13.5" pytest-asyncio = "^0.21.0" toml = "^0.10.2" psutil = "^5.9.4" diff --git a/storage_controller/Cargo.toml b/storage_controller/Cargo.toml index 08c80bc141..8e82996db1 100644 --- a/storage_controller/Cargo.toml +++ b/storage_controller/Cargo.toml @@ -18,6 +18,7 @@ anyhow.workspace = true bytes.workspace = true chrono.workspace = true clap.workspace = true +cron.workspace = true fail.workspace = true futures.workspace = true hex.workspace = true diff --git a/storage_controller/src/main.rs b/storage_controller/src/main.rs index 18922b9e05..4152e40a76 100644 --- a/storage_controller/src/main.rs +++ b/storage_controller/src/main.rs @@ -115,10 +115,14 @@ struct Cli { #[arg(long)] neon_local_repo_dir: Option, - /// Chaos testing + /// Chaos testing: exercise tenant migrations #[arg(long)] chaos_interval: Option, + /// Chaos testing: exercise an immediate exit + #[arg(long)] + chaos_exit_crontab: Option, + // Maximum acceptable lag for the secondary location while draining // a pageserver #[arg(long)] @@ -382,10 +386,12 @@ async fn async_main() -> anyhow::Result<()> { let service = service.clone(); let cancel = CancellationToken::new(); let cancel_bg = cancel.clone(); + let chaos_exit_crontab = args.chaos_exit_crontab; ( tokio::task::spawn( async move { - let mut chaos_injector = ChaosInjector::new(service, interval.into()); + let mut chaos_injector = + ChaosInjector::new(service, interval.into(), chaos_exit_crontab); chaos_injector.run(cancel_bg).await } .instrument(tracing::info_span!("chaos_injector")), diff --git a/storage_controller/src/service/chaos_injector.rs b/storage_controller/src/service/chaos_injector.rs index aa0ee0df5a..25a0fab5ca 100644 --- a/storage_controller/src/service/chaos_injector.rs +++ b/storage_controller/src/service/chaos_injector.rs @@ -16,29 +16,80 @@ use super::{Node, Scheduler, Service, TenantShard}; pub struct ChaosInjector { service: Arc, interval: Duration, + chaos_exit_crontab: Option, +} + +fn cron_to_next_duration(cron: &cron::Schedule) -> anyhow::Result { + use chrono::Utc; + let next = cron.upcoming(Utc).next().unwrap(); + let duration = (next - Utc::now()).to_std()?; + Ok(tokio::time::sleep(duration)) +} + +async fn maybe_sleep(sleep: Option) -> Option<()> { + if let Some(sleep) = sleep { + sleep.await; + Some(()) + } else { + None + } } impl ChaosInjector { - pub fn new(service: Arc, interval: Duration) -> Self { - Self { service, interval } + pub fn new( + service: Arc, + interval: Duration, + chaos_exit_crontab: Option, + ) -> Self { + Self { + service, + interval, + chaos_exit_crontab, + } } pub async fn run(&mut self, cancel: CancellationToken) { let mut interval = tokio::time::interval(self.interval); - - loop { - tokio::select! { - _ = interval.tick() => {} - _ = cancel.cancelled() => { - tracing::info!("Shutting down"); - return; + let cron_interval = { + if let Some(ref chaos_exit_crontab) = self.chaos_exit_crontab { + match cron_to_next_duration(chaos_exit_crontab) { + Ok(interval_exit) => Some(interval_exit), + Err(e) => { + tracing::error!("Error processing the cron schedule: {e}"); + None + } } + } else { + None } - - self.inject_chaos().await; - - tracing::info!("Chaos iteration..."); + }; + enum ChaosEvent { + ShuffleTenant, + ForceKill, } + let chaos_type = tokio::select! { + _ = interval.tick() => { + ChaosEvent::ShuffleTenant + } + Some(_) = maybe_sleep(cron_interval) => { + ChaosEvent::ForceKill + } + _ = cancel.cancelled() => { + tracing::info!("Shutting down"); + return; + } + }; + + match chaos_type { + ChaosEvent::ShuffleTenant => { + self.inject_chaos().await; + } + ChaosEvent::ForceKill => { + self.force_kill().await; + } + } + + tracing::info!("Chaos iteration..."); } /// If a shard has a secondary and attached location, then re-assign the secondary to be @@ -95,6 +146,11 @@ impl ChaosInjector { ); } + async fn force_kill(&mut self) { + tracing::warn!("Injecting chaos: force kill"); + std::process::exit(1); + } + async fn inject_chaos(&mut self) { // Pick some shards to interfere with let batch_size = 128; diff --git a/test_runner/regress/test_pageserver_secondary.py b/test_runner/regress/test_pageserver_secondary.py index 602d493ae6..a9b897b741 100644 --- a/test_runner/regress/test_pageserver_secondary.py +++ b/test_runner/regress/test_pageserver_secondary.py @@ -8,9 +8,10 @@ from pathlib import Path from typing import TYPE_CHECKING import pytest -from fixtures.common_types import TenantId, TenantShardId, TimelineId +from fixtures.common_types import TenantId, TenantShardId, TimelineArchivalState, TimelineId from fixtures.log_helper import log from fixtures.neon_fixtures import ( + DEFAULT_BRANCH_NAME, NeonEnvBuilder, NeonPageserver, StorageControllerMigrationConfig, @@ -927,8 +928,12 @@ def test_migration_to_cold_secondary(neon_env_builder: NeonEnvBuilder): workload.write_rows(128, upload=True) workload.write_rows(128, upload=True) workload.write_rows(128, upload=True) + + child_timeline_id = env.create_branch( + "foo", tenant_id, ancestor_branch_name=DEFAULT_BRANCH_NAME + ) + workload.write_rows(128, upload=True) - workload.stop() # Expect lots of layers assert len(ps_attached.list_layers(tenant_id, timeline_id)) > 10 @@ -937,9 +942,19 @@ def test_migration_to_cold_secondary(neon_env_builder: NeonEnvBuilder): for ps in env.pageservers: ps.http_client().configure_failpoints([("secondary-layer-download-sleep", "return(1000)")]) + def timeline_heatmap(tlid): + assert env.pageserver_remote_storage is not None + + heatmap = env.pageserver_remote_storage.heatmap_content(tenant_id) + for htl in heatmap["timelines"]: + if htl["timeline_id"] == str(tlid): + return htl + + raise RuntimeError(f"No heatmap for timeline: {tlid}") + # Upload a heatmap, so that secondaries have something to download ps_attached.http_client().tenant_heatmap_upload(tenant_id) - heatmap_before_migration = env.pageserver_remote_storage.heatmap_content(tenant_id) + heatmap_before_migration = timeline_heatmap(timeline_id) # This has no chance to succeed: we have lots of layers and each one takes at least 1000ms. # However, it pulls the heatmap, which will be important later. @@ -971,17 +986,12 @@ def test_migration_to_cold_secondary(neon_env_builder: NeonEnvBuilder): assert env.storage_controller.locate(tenant_id)[0]["node_id"] == ps_secondary.id ps_secondary.http_client().tenant_heatmap_upload(tenant_id) - heatmap_after_migration = env.pageserver_remote_storage.heatmap_content(tenant_id) + heatmap_after_migration = timeline_heatmap(timeline_id) - assert len(heatmap_before_migration["timelines"][0]["layers"]) > 0 + assert len(heatmap_before_migration["layers"]) > 0 - # The new layer map should contain all the layers in the pre-migration one - # and a new in memory layer - after_migration_heatmap_layers_count = len(heatmap_after_migration["timelines"][0]["layers"]) - assert ( - len(heatmap_before_migration["timelines"][0]["layers"]) + 1 - == after_migration_heatmap_layers_count - ) + after_migration_heatmap_layers_count = len(heatmap_after_migration["layers"]) + assert len(heatmap_before_migration["layers"]) <= after_migration_heatmap_layers_count log.info(f"Heatmap size after cold migration is {after_migration_heatmap_layers_count}") @@ -989,10 +999,71 @@ def test_migration_to_cold_secondary(neon_env_builder: NeonEnvBuilder): TenantShardId(tenant_id, shard_number=0, shard_count=0), timeline_id ) - def all_layers_downloaded(): + # Now simulate the case where a child timeline is archived, parent layers + # are evicted and the child is unarchived. When the child is unarchived, + # itself and the parent update their heatmaps to contain layers needed by the + # child. One can warm up the timeline hierarchy since the heatmaps are ready. + + def all_layers_downloaded(expected_layer_count: int): local_layers_count = len(ps_secondary.list_layers(tenant_id, timeline_id)) log.info(f"{local_layers_count=} {after_migration_heatmap_layers_count=}") - assert local_layers_count == after_migration_heatmap_layers_count + assert local_layers_count >= expected_layer_count - wait_until(all_layers_downloaded) + wait_until(lambda: all_layers_downloaded(after_migration_heatmap_layers_count)) + ps_secondary.http_client().tenant_heatmap_upload(tenant_id) + + before = ( + ps_secondary.http_client() + .get_metrics() + .query_one("pageserver_remote_ondemand_downloaded_layers_total") + .value + ) + workload.validate() + after = ( + ps_secondary.http_client() + .get_metrics() + .query_one("pageserver_remote_ondemand_downloaded_layers_total") + .value + ) + + workload.stop() + assert before == after + + def check_archival_state(state: TimelineArchivalState, tline): + timelines = ( + timeline["timeline_id"] + for timeline in ps_secondary.http_client().timeline_list(tenant_id=tenant_id) + ) + + if state == TimelineArchivalState.ARCHIVED: + assert str(tline) not in timelines + elif state == TimelineArchivalState.UNARCHIVED: + assert str(tline) in timelines + + ps_secondary.http_client().timeline_archival_config( + tenant_id, child_timeline_id, TimelineArchivalState.ARCHIVED + ) + ps_secondary.http_client().timeline_offload(tenant_id, child_timeline_id) + wait_until(lambda: check_archival_state(TimelineArchivalState.ARCHIVED, child_timeline_id)) + + ps_secondary.http_client().evict_all_layers(tenant_id, timeline_id) + ps_secondary.http_client().tenant_heatmap_upload(tenant_id) + assert len(timeline_heatmap(timeline_id)["layers"]) == 0 + + ps_secondary.http_client().timeline_archival_config( + tenant_id, child_timeline_id, TimelineArchivalState.UNARCHIVED + ) + wait_until(lambda: check_archival_state(TimelineArchivalState.UNARCHIVED, child_timeline_id)) + + ps_secondary.http_client().tenant_heatmap_upload(tenant_id) + log.info(f"Parent timeline heatmap size: {len(timeline_heatmap(timeline_id)['layers'])}") + log.info(f"Child timeline heatmap size: {len(timeline_heatmap(child_timeline_id)['layers'])}") + + expected_locally = len(timeline_heatmap(timeline_id)["layers"]) + assert expected_locally > 0 + + env.storage_controller.download_heatmap_layers( + TenantShardId(tenant_id, shard_number=0, shard_count=0), timeline_id + ) + wait_until(lambda: all_layers_downloaded(expected_locally))