From f8d530d0313485bdcef0e9ab9b36010753a9c60a Mon Sep 17 00:00:00 2001 From: Folke Behrens Date: Thu, 5 Jun 2025 12:42:10 +0200 Subject: [PATCH] Move run_until_cancelled into util mod. --- proxy/src/binary/pg_sni_router.rs | 5 ++--- proxy/src/console_redirect_proxy.rs | 5 ++--- proxy/src/lib.rs | 1 + proxy/src/proxy/mod.rs | 17 ++--------------- proxy/src/serverless/mod.rs | 2 +- proxy/src/serverless/sql_over_http.rs | 3 ++- proxy/src/util.rs | 16 ++++++++++++++++ 7 files changed, 26 insertions(+), 23 deletions(-) create mode 100644 proxy/src/util.rs diff --git a/proxy/src/binary/pg_sni_router.rs b/proxy/src/binary/pg_sni_router.rs index a4f517fead..481bd8501c 100644 --- a/proxy/src/binary/pg_sni_router.rs +++ b/proxy/src/binary/pg_sni_router.rs @@ -28,10 +28,9 @@ use crate::context::RequestContext; use crate::metrics::{Metrics, ThreadPoolMetrics}; use crate::pqproto::FeStartupPacket; use crate::protocol2::ConnectionInfo; -use crate::proxy::{ - ErrorSource, TlsRequired, copy_bidirectional_client_compute, run_until_cancelled, -}; +use crate::proxy::{ErrorSource, TlsRequired, copy_bidirectional_client_compute}; use crate::stream::{PqStream, Stream}; +use crate::util::run_until_cancelled; project_git_version!(GIT_VERSION); diff --git a/proxy/src/console_redirect_proxy.rs b/proxy/src/console_redirect_proxy.rs index 324dcf5824..89641019e0 100644 --- a/proxy/src/console_redirect_proxy.rs +++ b/proxy/src/console_redirect_proxy.rs @@ -15,9 +15,8 @@ use crate::pglb::connect_compute::{TcpMechanism, connect_to_compute}; use crate::pglb::handshake::{HandshakeData, handshake}; use crate::pglb::passthrough::ProxyPassthrough; use crate::protocol2::{ConnectHeader, ConnectionInfo, read_proxy_protocol}; -use crate::proxy::{ - ClientRequestError, ErrorSource, prepare_client_connection, run_until_cancelled, -}; +use crate::proxy::{ClientRequestError, ErrorSource, prepare_client_connection}; +use crate::util::run_until_cancelled; pub async fn task_main( config: &'static ProxyConfig, diff --git a/proxy/src/lib.rs b/proxy/src/lib.rs index d65d056585..026c6aeba9 100644 --- a/proxy/src/lib.rs +++ b/proxy/src/lib.rs @@ -106,4 +106,5 @@ mod tls; mod types; mod url; mod usage_metrics; +mod util; mod waiters; diff --git a/proxy/src/proxy/mod.rs b/proxy/src/proxy/mod.rs index 0e00c4f97e..23bb873b2f 100644 --- a/proxy/src/proxy/mod.rs +++ b/proxy/src/proxy/mod.rs @@ -3,6 +3,7 @@ mod tests; pub(crate) mod retry; pub(crate) mod wake_compute; + use std::sync::Arc; use futures::FutureExt; @@ -30,6 +31,7 @@ use crate::protocol2::{ConnectHeader, ConnectionInfo, ConnectionInfoExtra, read_ use crate::rate_limiter::EndpointRateLimiter; use crate::stream::{PqStream, Stream}; use crate::types::EndpointCacheKey; +use crate::util::run_until_cancelled; use crate::{auth, compute}; const ERR_INSECURE_CONNECTION: &str = "connection is insecure (try using `sslmode=require`)"; @@ -46,21 +48,6 @@ impl ReportableError for TlsRequired { impl UserFacingError for TlsRequired {} -pub async fn run_until_cancelled( - f: F, - cancellation_token: &CancellationToken, -) -> Option { - match futures::future::select( - std::pin::pin!(f), - std::pin::pin!(cancellation_token.cancelled()), - ) - .await - { - futures::future::Either::Left((f, _)) => Some(f), - futures::future::Either::Right(((), _)) => None, - } -} - pub async fn task_main( config: &'static ProxyConfig, auth_backend: &'static auth::Backend<'static, ()>, diff --git a/proxy/src/serverless/mod.rs b/proxy/src/serverless/mod.rs index f6f681ac45..ed33bf1246 100644 --- a/proxy/src/serverless/mod.rs +++ b/proxy/src/serverless/mod.rs @@ -50,10 +50,10 @@ use crate::context::RequestContext; use crate::ext::TaskExt; use crate::metrics::Metrics; use crate::protocol2::{ConnectHeader, ConnectionInfo, read_proxy_protocol}; -use crate::proxy::run_until_cancelled; use crate::rate_limiter::EndpointRateLimiter; use crate::serverless::backend::PoolingBackend; use crate::serverless::http_util::{api_error_into_response, json_response}; +use crate::util::run_until_cancelled; pub(crate) const SERVERLESS_DRIVER_SNI: &str = "api"; pub(crate) const AUTH_BROKER_SNI: &str = "apiauth"; diff --git a/proxy/src/serverless/sql_over_http.rs b/proxy/src/serverless/sql_over_http.rs index eb80ac9ad0..b2eb801f5c 100644 --- a/proxy/src/serverless/sql_over_http.rs +++ b/proxy/src/serverless/sql_over_http.rs @@ -41,10 +41,11 @@ use crate::error::{ErrorKind, ReportableError, UserFacingError}; use crate::http::{ReadBodyError, read_body_with_limit}; use crate::metrics::{HttpDirection, Metrics, SniGroup, SniKind}; use crate::pqproto::StartupMessageParams; -use crate::proxy::{NeonOptions, run_until_cancelled}; +use crate::proxy::NeonOptions; use crate::serverless::backend::HttpConnError; use crate::types::{DbName, RoleName}; use crate::usage_metrics::{MetricCounter, MetricCounterRecorder}; +use crate::util::run_until_cancelled; #[derive(serde::Deserialize)] #[serde(rename_all = "camelCase")] diff --git a/proxy/src/util.rs b/proxy/src/util.rs new file mode 100644 index 0000000000..1face24398 --- /dev/null +++ b/proxy/src/util.rs @@ -0,0 +1,16 @@ +use tokio_util::sync::CancellationToken; + +pub async fn run_until_cancelled( + f: F, + cancellation_token: &CancellationToken, +) -> Option { + match futures::future::select( + std::pin::pin!(f), + std::pin::pin!(cancellation_token.cancelled()), + ) + .await + { + futures::future::Either::Left((f, _)) => Some(f), + futures::future::Either::Right(((), _)) => None, + } +}