diff --git a/Cargo.lock b/Cargo.lock index 23867eb2e4..04be1ecbd0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4016,6 +4016,29 @@ dependencies = [ "indexmap 1.9.3", ] +[[package]] +name = "pg_sni_router" +version = "0.1.0" +dependencies = [ + "anyhow", + "clap", + "futures", + "git-version", + "itertools 0.10.5", + "pq_proto", + "proxy-core", + "proxy-sasl", + "rustls 0.22.4", + "rustls-pemfile 2.1.1", + "socket2 0.5.5", + "tokio", + "tokio-util", + "tracing", + "tracing-utils", + "utils", + "uuid", +] + [[package]] name = "phf" version = "0.11.1" @@ -4413,6 +4436,34 @@ dependencies = [ [[package]] name = "proxy" version = "0.1.0" +dependencies = [ + "anyhow", + "aws-config", + "clap", + "futures", + "git-version", + "humantime", + "itertools 0.10.5", + "metrics", + "pq_proto", + "proxy-core", + "proxy-sasl", + "remote_storage", + "rustls 0.22.4", + "rustls-pemfile 2.1.1", + "socket2 0.5.5", + "tikv-jemallocator", + "tokio", + "tokio-util", + "tracing", + "tracing-utils", + "utils", + "uuid", +] + +[[package]] +name = "proxy-core" +version = "0.1.0" dependencies = [ "ahash", "anyhow", diff --git a/Cargo.toml b/Cargo.toml index 8f2512fd5e..addf982083 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -11,6 +11,8 @@ members = [ "pageserver/pagebench", "proxy/core", "proxy/sasl", + "proxy/proxy", + "proxy/pg_sni_router", "safekeeper", "storage_broker", "storage_controller", diff --git a/proxy/core/Cargo.toml b/proxy/core/Cargo.toml index 6adfc9b2e0..c9e5be4ce7 100644 --- a/proxy/core/Cargo.toml +++ b/proxy/core/Cargo.toml @@ -1,5 +1,5 @@ [package] -name = "proxy" +name = "proxy-core" version = "0.1.0" edition.workspace = true license.workspace = true diff --git a/proxy/pg_sni_router/Cargo.toml b/proxy/pg_sni_router/Cargo.toml new file mode 100644 index 0000000000..e772936149 --- /dev/null +++ b/proxy/pg_sni_router/Cargo.toml @@ -0,0 +1,129 @@ +[package] +name = "pg_sni_router" +version = "0.1.0" +edition.workspace = true +license.workspace = true + +[features] +default = [] +testing = [] + +[dependencies] +proxy-sasl = { version = "0.1", path = "../sasl" } +proxy-core = { version = "0.1", path = "../core" } + +# ahash.workspace = true +anyhow.workspace = true +# arc-swap.workspace = true +# async-compression.workspace = true +# async-trait.workspace = true +# atomic-take.workspace = true +# aws-config.workspace = true +# aws-sdk-iam.workspace = true +# aws-sigv4.workspace = true +# aws-types.workspace = true +# base64.workspace = true +# bstr.workspace = true +# bytes = { workspace = true, features = ["serde"] } +# camino.workspace = true +# chrono.workspace = true +clap.workspace = true +# consumption_metrics.workspace = true +# crossbeam-deque.workspace = true +# dashmap.workspace = true +# env_logger.workspace = true +# framed-websockets.workspace = true +futures.workspace = true +git-version.workspace = true +# hashbrown.workspace = true +# hashlink.workspace = true +# hex.workspace = true +# hmac.workspace = true +# hostname.workspace = true +# http.workspace = true +# humantime.workspace = true +# humantime-serde.workspace = true +# hyper.workspace = true +# hyper1 = { package = "hyper", version = "1.2", features = ["server"] } +# hyper-util = { version = "0.1", features = ["server", "http1", "http2", "tokio"] } +# http-body-util = { version = "0.1" } +# indexmap.workspace = true +# ipnet.workspace = true +itertools.workspace = true +# lasso = { workspace = true, features = ["multi-threaded"] } +# md5.workspace = true +# measured = { workspace = true, features = ["lasso"] } +# metrics.workspace = true +# once_cell.workspace = true +# opentelemetry.workspace = true +# parking_lot.workspace = true +# parquet.workspace = true +# parquet_derive.workspace = true +# pin-project-lite.workspace = true +# postgres_backend.workspace = true +pq_proto.workspace = true +# # prometheus.workspace = true +# rand.workspace = true +# regex.workspace = true +# remote_storage = { version = "0.1", path = "../../libs/remote_storage/" } +# reqwest.workspace = true +# reqwest-middleware = { workspace = true, features = ["json"] } +# reqwest-retry.workspace = true +# reqwest-tracing.workspace = true +# routerify.workspace = true +# rustc-hash.workspace = true +rustls-pemfile.workspace = true +rustls.workspace = true +# scopeguard.workspace = true +# serde.workspace = true +# serde_json.workspace = true +# sha2 = { workspace = true, features = ["asm", "oid"] } +# smol_str.workspace = true +# smallvec.workspace = true +socket2.workspace = true +# subtle.workspace = true +# task-local-extensions.workspace = true +# thiserror.workspace = true +# tikv-jemallocator.workspace = true +# tikv-jemalloc-ctl = { workspace = true, features = ["use_std"] } +# tokio-postgres.workspace = true +# tokio-postgres-rustls.workspace = true +# tokio-rustls.workspace = true +tokio-util.workspace = true +tokio = { workspace = true, features = ["signal"] } +# tower-service.workspace = true +# tracing-opentelemetry.workspace = true +# tracing-subscriber.workspace = true +tracing-utils.workspace = true +tracing.workspace = true +# try-lock.workspace = true +# typed-json.workspace = true +# url.workspace = true +# urlencoding.workspace = true +utils.workspace = true +uuid.workspace = true +# rustls-native-certs.workspace = true +# x509-parser.workspace = true +# postgres-protocol.workspace = true +# redis.workspace = true + +# # jwt stuff +# jose-jwa = "0.1.2" +# jose-jwk = { version = "0.1.2", features = ["p256", "p384", "rsa"] } +# signature = "2" +# ecdsa = "0.16" +# p256 = "0.13" +# rsa = "0.9" + +# workspace_hack.workspace = true + +# [dev-dependencies] +# camino-tempfile.workspace = true +# fallible-iterator.workspace = true +# tokio-tungstenite.workspace = true +# pbkdf2 = { workspace = true, features = ["simple", "std"] } +# rcgen.workspace = true +# rstest.workspace = true +# tokio-postgres-rustls.workspace = true +# walkdir.workspace = true +# rand_distr = "0.4" diff --git a/proxy/core/src/bin/pg_sni_router.rs b/proxy/pg_sni_router/src/main.rs similarity index 93% rename from proxy/core/src/bin/pg_sni_router.rs rename to proxy/pg_sni_router/src/main.rs index 0ac7e6d965..21dc618de0 100644 --- a/proxy/core/src/bin/pg_sni_router.rs +++ b/proxy/pg_sni_router/src/main.rs @@ -7,9 +7,9 @@ use std::{net::SocketAddr, sync::Arc}; use futures::future::Either; use itertools::Itertools; -use proxy::context::RequestMonitoring; -use proxy::metrics::Metrics; -use proxy::proxy::{copy_bidirectional_client_compute, run_until_cancelled, ErrorSource}; +use proxy_core::context::RequestMonitoring; +use proxy_core::metrics::Metrics; +use proxy_core::proxy::{copy_bidirectional_client_compute, run_until_cancelled, ErrorSource}; use proxy_sasl::scram::threadpool::ThreadPoolMetrics; use proxy_sasl::scram::TlsServerEndPoint; use rustls::pki_types::PrivateKeyDer; @@ -18,7 +18,7 @@ use tokio::net::TcpListener; use anyhow::{anyhow, bail, ensure, Context}; use clap::Arg; use futures::TryFutureExt; -use proxy::stream::{PqStream, Stream}; +use proxy_core::stream::{PqStream, Stream}; use tokio::io::{AsyncRead, AsyncWrite}; use tokio_util::sync::CancellationToken; @@ -63,7 +63,7 @@ fn cli() -> clap::Command { #[tokio::main] async fn main() -> anyhow::Result<()> { - let _logging_guard = proxy::logging::init().await?; + let _logging_guard = proxy_core::logging::init().await?; let _panic_hook_guard = utils::logging::replace_panic_hook_with_tracing_panic_hook(); let _sentry_guard = init_sentry(Some(GIT_VERSION.into()), &[]); @@ -134,14 +134,14 @@ async fn main() -> anyhow::Result<()> { proxy_listener, cancellation_token.clone(), )); - let signals_task = tokio::spawn(proxy::handle_signals(cancellation_token)); + let signals_task = tokio::spawn(proxy_core::handle_signals(cancellation_token)); // the signal task cant ever succeed. // the main task can error, or can succeed on cancellation. // we want to immediately exit on either of these cases let signal = match futures::future::select(signals_task, main).await { - Either::Left((res, _)) => proxy::flatten_err(res)?, - Either::Right((res, _)) => return proxy::flatten_err(res), + Either::Left((res, _)) => proxy_core::flatten_err(res)?, + Either::Right((res, _)) => return proxy_core::flatten_err(res), }; // maintenance tasks return `Infallible` success values, this is an impossible value @@ -181,7 +181,7 @@ async fn task_main( let ctx = RequestMonitoring::new( session_id, peer_addr.ip(), - proxy::metrics::Protocol::SniRouter, + proxy_core::metrics::Protocol::SniRouter, "sni", ); handle_client(ctx, dest_suffix, tls_config, tls_server_end_point, socket).await @@ -250,7 +250,7 @@ async fn ssl_handshake( "unexpected startup packet, rejecting connection" ); stream - .throw_error_str(ERR_INSECURE_CONNECTION, proxy::error::ErrorKind::User) + .throw_error_str(ERR_INSECURE_CONNECTION, proxy_core::error::ErrorKind::User) .await? } } diff --git a/proxy/proxy/Cargo.toml b/proxy/proxy/Cargo.toml new file mode 100644 index 0000000000..153437c294 --- /dev/null +++ b/proxy/proxy/Cargo.toml @@ -0,0 +1,129 @@ +[package] +name = "proxy" +version = "0.1.0" +edition.workspace = true +license.workspace = true + +[features] +default = [] +testing = [] + +[dependencies] +proxy-sasl = { version = "0.1", path = "../sasl" } +proxy-core = { version = "0.1", path = "../core" } + +# ahash.workspace = true +anyhow.workspace = true +# arc-swap.workspace = true +# async-compression.workspace = true +# async-trait.workspace = true +# atomic-take.workspace = true +aws-config.workspace = true +# aws-sdk-iam.workspace = true +# aws-sigv4.workspace = true +# aws-types.workspace = true +# base64.workspace = true +# bstr.workspace = true +# bytes = { workspace = true, features = ["serde"] } +# camino.workspace = true +# chrono.workspace = true +clap.workspace = true +# consumption_metrics.workspace = true +# crossbeam-deque.workspace = true +# dashmap.workspace = true +# env_logger.workspace = true +# framed-websockets.workspace = true +futures.workspace = true +git-version.workspace = true +# hashbrown.workspace = true +# hashlink.workspace = true +# hex.workspace = true +# hmac.workspace = true +# hostname.workspace = true +# http.workspace = true +humantime.workspace = true +# humantime-serde.workspace = true +# hyper.workspace = true +# hyper1 = { package = "hyper", version = "1.2", features = ["server"] } +# hyper-util = { version = "0.1", features = ["server", "http1", "http2", "tokio"] } +# http-body-util = { version = "0.1" } +# indexmap.workspace = true +# ipnet.workspace = true +itertools.workspace = true +# lasso = { workspace = true, features = ["multi-threaded"] } +# md5.workspace = true +# measured = { workspace = true, features = ["lasso"] } +metrics.workspace = true +# once_cell.workspace = true +# opentelemetry.workspace = true +# parking_lot.workspace = true +# parquet.workspace = true +# parquet_derive.workspace = true +# pin-project-lite.workspace = true +# postgres_backend.workspace = true +pq_proto.workspace = true +# # prometheus.workspace = true +# rand.workspace = true +# regex.workspace = true +remote_storage = { version = "0.1", path = "../../libs/remote_storage/" } +# reqwest.workspace = true +# reqwest-middleware = { workspace = true, features = ["json"] } +# reqwest-retry.workspace = true +# reqwest-tracing.workspace = true +# routerify.workspace = true +# rustc-hash.workspace = true +rustls-pemfile.workspace = true +rustls.workspace = true +# scopeguard.workspace = true +# serde.workspace = true +# serde_json.workspace = true +# sha2 = { workspace = true, features = ["asm", "oid"] } +# smol_str.workspace = true +# smallvec.workspace = true +socket2.workspace = true +# subtle.workspace = true +# task-local-extensions.workspace = true +# thiserror.workspace = true +tikv-jemallocator.workspace = true +# tikv-jemalloc-ctl = { workspace = true, features = ["use_std"] } +# tokio-postgres.workspace = true +# tokio-postgres-rustls.workspace = true +# tokio-rustls.workspace = true +tokio-util.workspace = true +tokio = { workspace = true, features = ["signal"] } +# tower-service.workspace = true +# tracing-opentelemetry.workspace = true +# tracing-subscriber.workspace = true +tracing-utils.workspace = true +tracing.workspace = true +# try-lock.workspace = true +# typed-json.workspace = true +# url.workspace = true +# urlencoding.workspace = true +utils.workspace = true +uuid.workspace = true +# rustls-native-certs.workspace = true +# x509-parser.workspace = true +# postgres-protocol.workspace = true +# redis.workspace = true + +# # jwt stuff +# jose-jwa = "0.1.2" +# jose-jwk = { version = "0.1.2", features = ["p256", "p384", "rsa"] } +# signature = "2" +# ecdsa = "0.16" +# p256 = "0.13" +# rsa = "0.9" + +# workspace_hack.workspace = true + +# [dev-dependencies] +# camino-tempfile.workspace = true +# fallible-iterator.workspace = true +# tokio-tungstenite.workspace = true +# pbkdf2 = { workspace = true, features = ["simple", "std"] } +# rcgen.workspace = true +# rstest.workspace = true +# tokio-postgres-rustls.workspace = true +# walkdir.workspace = true +# rand_distr = "0.4" diff --git a/proxy/core/src/bin/proxy.rs b/proxy/proxy/src/main.rs similarity index 93% rename from proxy/core/src/bin/proxy.rs rename to proxy/proxy/src/main.rs index b9c43e017d..ae3796c485 100644 --- a/proxy/core/src/bin/proxy.rs +++ b/proxy/proxy/src/main.rs @@ -7,36 +7,36 @@ use aws_config::provider_config::ProviderConfig; use aws_config::web_identity_token::WebIdentityTokenCredentialsProvider; use aws_config::Region; use futures::future::Either; -use proxy::auth; -use proxy::auth::backend::AuthRateLimiter; -use proxy::auth::backend::MaybeOwned; -use proxy::cancellation::CancelMap; -use proxy::cancellation::CancellationHandler; -use proxy::config::remote_storage_from_toml; -use proxy::config::AuthenticationConfig; -use proxy::config::CacheOptions; -use proxy::config::HttpConfig; -use proxy::config::ProjectInfoCacheOptions; -use proxy::console; -use proxy::context::parquet::ParquetUploadArgs; -use proxy::http; -use proxy::http::health_server::AppMetrics; -use proxy::metrics::Metrics; -use proxy::rate_limiter::EndpointRateLimiter; -use proxy::rate_limiter::LeakyBucketConfig; -use proxy::rate_limiter::RateBucketInfo; -use proxy::rate_limiter::WakeComputeRateLimiter; -use proxy::redis::cancellation_publisher::RedisPublisherClient; -use proxy::redis::connection_with_credentials_provider::ConnectionWithCredentialsProvider; -use proxy::redis::elasticache; -use proxy::redis::notifications; -use proxy::serverless::cancel_set::CancelSet; -use proxy::serverless::GlobalConnPoolOptions; -use proxy::usage_metrics; +use proxy_core::auth; +use proxy_core::auth::backend::AuthRateLimiter; +use proxy_core::auth::backend::MaybeOwned; +use proxy_core::cancellation::CancelMap; +use proxy_core::cancellation::CancellationHandler; +use proxy_core::config::remote_storage_from_toml; +use proxy_core::config::AuthenticationConfig; +use proxy_core::config::CacheOptions; +use proxy_core::config::HttpConfig; +use proxy_core::config::ProjectInfoCacheOptions; +use proxy_core::console; +use proxy_core::context::parquet::ParquetUploadArgs; +use proxy_core::http; +use proxy_core::http::health_server::AppMetrics; +use proxy_core::metrics::Metrics; +use proxy_core::rate_limiter::EndpointRateLimiter; +use proxy_core::rate_limiter::LeakyBucketConfig; +use proxy_core::rate_limiter::RateBucketInfo; +use proxy_core::rate_limiter::WakeComputeRateLimiter; +use proxy_core::redis::cancellation_publisher::RedisPublisherClient; +use proxy_core::redis::connection_with_credentials_provider::ConnectionWithCredentialsProvider; +use proxy_core::redis::elasticache; +use proxy_core::redis::notifications; +use proxy_core::serverless::cancel_set::CancelSet; +use proxy_core::serverless::GlobalConnPoolOptions; +use proxy_core::usage_metrics; use anyhow::bail; -use proxy::config::{self, ProxyConfig}; -use proxy::serverless; +use proxy_core::config::{self, ProxyConfig}; +use proxy_core::serverless; use proxy_sasl::scram::threadpool::ThreadPool; use remote_storage::RemoteStorageConfig; use std::net::SocketAddr; @@ -268,7 +268,7 @@ struct SqlOverHttpArgs { #[tokio::main] async fn main() -> anyhow::Result<()> { - let _logging_guard = proxy::logging::init().await?; + let _logging_guard = proxy_core::logging::init().await?; let _panic_hook_guard = utils::logging::replace_panic_hook_with_tracing_panic_hook(); let _sentry_guard = init_sentry(Some(GIT_VERSION.into()), &[]); @@ -279,7 +279,7 @@ async fn main() -> anyhow::Result<()> { build_tag: BUILD_TAG, }); - let jemalloc = match proxy::jemalloc::MetricRecorder::new() { + let jemalloc = match proxy_core::jemalloc::MetricRecorder::new() { Ok(t) => Some(t), Err(e) => { tracing::error!(error = ?e, "could not start jemalloc metrics loop"); @@ -394,7 +394,7 @@ async fn main() -> anyhow::Result<()> { >::new( cancel_map.clone(), redis_publisher, - proxy::metrics::CancellationSource::FromClient, + proxy_core::metrics::CancellationSource::FromClient, )); // bit of a hack - find the min rps and max rps supported and turn it into @@ -419,7 +419,7 @@ async fn main() -> anyhow::Result<()> { // client facing tasks. these will exit on error or on cancellation // cancellation returns Ok(()) let mut client_tasks = JoinSet::new(); - client_tasks.spawn(proxy::proxy::task_main( + client_tasks.spawn(proxy_core::proxy::task_main( config, proxy_listener, cancellation_token.clone(), @@ -443,20 +443,20 @@ async fn main() -> anyhow::Result<()> { )); } - client_tasks.spawn(proxy::context::parquet::worker( + client_tasks.spawn(proxy_core::context::parquet::worker( cancellation_token.clone(), args.parquet_upload, )); // maintenance tasks. these never return unless there's an error let mut maintenance_tasks = JoinSet::new(); - maintenance_tasks.spawn(proxy::handle_signals(cancellation_token.clone())); + maintenance_tasks.spawn(proxy_core::handle_signals(cancellation_token.clone())); maintenance_tasks.spawn(http::health_server::task_main( http_listener, AppMetrics { jemalloc, neon_metrics, - proxy: proxy::metrics::Metrics::get(), + proxy: proxy_core::metrics::Metrics::get(), }, )); maintenance_tasks.spawn(console::mgmt::task_main(mgmt_listener)); @@ -471,7 +471,7 @@ async fn main() -> anyhow::Result<()> { } if let auth::BackendType::Console(api, _) = &config.auth_backend { - if let proxy::console::provider::ConsoleBackend::Console(api) = &**api { + if let proxy_core::console::provider::ConsoleBackend::Console(api) = &**api { match (redis_notifications_client, regional_redis_client.clone()) { (None, None) => {} (client1, client2) => { @@ -516,11 +516,11 @@ async fn main() -> anyhow::Result<()> { .await { // exit immediately on maintenance task completion - Either::Left((Some(res), _)) => break proxy::flatten_err(res)?, + Either::Left((Some(res), _)) => break proxy_core::flatten_err(res)?, // exit with error immediately if all maintenance tasks have ceased (should be caught by branch above) Either::Left((None, _)) => bail!("no maintenance tasks running. invalid state"), // exit immediately on client task error - Either::Right((Some(res), _)) => proxy::flatten_err(res)?, + Either::Right((Some(res), _)) => proxy_core::flatten_err(res)?, // exit if all our client tasks have shutdown gracefully Either::Right((None, _)) => return Ok(()), } @@ -707,7 +707,7 @@ mod tests { use std::time::Duration; use clap::Parser; - use proxy::rate_limiter::RateBucketInfo; + use proxy_core::rate_limiter::RateBucketInfo; #[test] fn parse_endpoint_rps_limit() {