split out binaries

This commit is contained in:
Conrad Ludgate
2024-08-13 14:29:38 +01:00
parent b62e7c0138
commit 3df6d368e3
7 changed files with 361 additions and 50 deletions

51
Cargo.lock generated
View File

@@ -4016,6 +4016,29 @@ dependencies = [
"indexmap 1.9.3",
]
[[package]]
name = "pg_sni_router"
version = "0.1.0"
dependencies = [
"anyhow",
"clap",
"futures",
"git-version",
"itertools 0.10.5",
"pq_proto",
"proxy-core",
"proxy-sasl",
"rustls 0.22.4",
"rustls-pemfile 2.1.1",
"socket2 0.5.5",
"tokio",
"tokio-util",
"tracing",
"tracing-utils",
"utils",
"uuid",
]
[[package]]
name = "phf"
version = "0.11.1"
@@ -4413,6 +4436,34 @@ dependencies = [
[[package]]
name = "proxy"
version = "0.1.0"
dependencies = [
"anyhow",
"aws-config",
"clap",
"futures",
"git-version",
"humantime",
"itertools 0.10.5",
"metrics",
"pq_proto",
"proxy-core",
"proxy-sasl",
"remote_storage",
"rustls 0.22.4",
"rustls-pemfile 2.1.1",
"socket2 0.5.5",
"tikv-jemallocator",
"tokio",
"tokio-util",
"tracing",
"tracing-utils",
"utils",
"uuid",
]
[[package]]
name = "proxy-core"
version = "0.1.0"
dependencies = [
"ahash",
"anyhow",

View File

@@ -11,6 +11,8 @@ members = [
"pageserver/pagebench",
"proxy/core",
"proxy/sasl",
"proxy/proxy",
"proxy/pg_sni_router",
"safekeeper",
"storage_broker",
"storage_controller",

View File

@@ -1,5 +1,5 @@
[package]
name = "proxy"
name = "proxy-core"
version = "0.1.0"
edition.workspace = true
license.workspace = true

View File

@@ -0,0 +1,129 @@
[package]
name = "pg_sni_router"
version = "0.1.0"
edition.workspace = true
license.workspace = true
[features]
default = []
testing = []
[dependencies]
proxy-sasl = { version = "0.1", path = "../sasl" }
proxy-core = { version = "0.1", path = "../core" }
# ahash.workspace = true
anyhow.workspace = true
# arc-swap.workspace = true
# async-compression.workspace = true
# async-trait.workspace = true
# atomic-take.workspace = true
# aws-config.workspace = true
# aws-sdk-iam.workspace = true
# aws-sigv4.workspace = true
# aws-types.workspace = true
# base64.workspace = true
# bstr.workspace = true
# bytes = { workspace = true, features = ["serde"] }
# camino.workspace = true
# chrono.workspace = true
clap.workspace = true
# consumption_metrics.workspace = true
# crossbeam-deque.workspace = true
# dashmap.workspace = true
# env_logger.workspace = true
# framed-websockets.workspace = true
futures.workspace = true
git-version.workspace = true
# hashbrown.workspace = true
# hashlink.workspace = true
# hex.workspace = true
# hmac.workspace = true
# hostname.workspace = true
# http.workspace = true
# humantime.workspace = true
# humantime-serde.workspace = true
# hyper.workspace = true
# hyper1 = { package = "hyper", version = "1.2", features = ["server"] }
# hyper-util = { version = "0.1", features = ["server", "http1", "http2", "tokio"] }
# http-body-util = { version = "0.1" }
# indexmap.workspace = true
# ipnet.workspace = true
itertools.workspace = true
# lasso = { workspace = true, features = ["multi-threaded"] }
# md5.workspace = true
# measured = { workspace = true, features = ["lasso"] }
# metrics.workspace = true
# once_cell.workspace = true
# opentelemetry.workspace = true
# parking_lot.workspace = true
# parquet.workspace = true
# parquet_derive.workspace = true
# pin-project-lite.workspace = true
# postgres_backend.workspace = true
pq_proto.workspace = true
# # prometheus.workspace = true
# rand.workspace = true
# regex.workspace = true
# remote_storage = { version = "0.1", path = "../../libs/remote_storage/" }
# reqwest.workspace = true
# reqwest-middleware = { workspace = true, features = ["json"] }
# reqwest-retry.workspace = true
# reqwest-tracing.workspace = true
# routerify.workspace = true
# rustc-hash.workspace = true
rustls-pemfile.workspace = true
rustls.workspace = true
# scopeguard.workspace = true
# serde.workspace = true
# serde_json.workspace = true
# sha2 = { workspace = true, features = ["asm", "oid"] }
# smol_str.workspace = true
# smallvec.workspace = true
socket2.workspace = true
# subtle.workspace = true
# task-local-extensions.workspace = true
# thiserror.workspace = true
# tikv-jemallocator.workspace = true
# tikv-jemalloc-ctl = { workspace = true, features = ["use_std"] }
# tokio-postgres.workspace = true
# tokio-postgres-rustls.workspace = true
# tokio-rustls.workspace = true
tokio-util.workspace = true
tokio = { workspace = true, features = ["signal"] }
# tower-service.workspace = true
# tracing-opentelemetry.workspace = true
# tracing-subscriber.workspace = true
tracing-utils.workspace = true
tracing.workspace = true
# try-lock.workspace = true
# typed-json.workspace = true
# url.workspace = true
# urlencoding.workspace = true
utils.workspace = true
uuid.workspace = true
# rustls-native-certs.workspace = true
# x509-parser.workspace = true
# postgres-protocol.workspace = true
# redis.workspace = true
# # jwt stuff
# jose-jwa = "0.1.2"
# jose-jwk = { version = "0.1.2", features = ["p256", "p384", "rsa"] }
# signature = "2"
# ecdsa = "0.16"
# p256 = "0.13"
# rsa = "0.9"
# workspace_hack.workspace = true
# [dev-dependencies]
# camino-tempfile.workspace = true
# fallible-iterator.workspace = true
# tokio-tungstenite.workspace = true
# pbkdf2 = { workspace = true, features = ["simple", "std"] }
# rcgen.workspace = true
# rstest.workspace = true
# tokio-postgres-rustls.workspace = true
# walkdir.workspace = true
# rand_distr = "0.4"

View File

@@ -7,9 +7,9 @@ use std::{net::SocketAddr, sync::Arc};
use futures::future::Either;
use itertools::Itertools;
use proxy::context::RequestMonitoring;
use proxy::metrics::Metrics;
use proxy::proxy::{copy_bidirectional_client_compute, run_until_cancelled, ErrorSource};
use proxy_core::context::RequestMonitoring;
use proxy_core::metrics::Metrics;
use proxy_core::proxy::{copy_bidirectional_client_compute, run_until_cancelled, ErrorSource};
use proxy_sasl::scram::threadpool::ThreadPoolMetrics;
use proxy_sasl::scram::TlsServerEndPoint;
use rustls::pki_types::PrivateKeyDer;
@@ -18,7 +18,7 @@ use tokio::net::TcpListener;
use anyhow::{anyhow, bail, ensure, Context};
use clap::Arg;
use futures::TryFutureExt;
use proxy::stream::{PqStream, Stream};
use proxy_core::stream::{PqStream, Stream};
use tokio::io::{AsyncRead, AsyncWrite};
use tokio_util::sync::CancellationToken;
@@ -63,7 +63,7 @@ fn cli() -> clap::Command {
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let _logging_guard = proxy::logging::init().await?;
let _logging_guard = proxy_core::logging::init().await?;
let _panic_hook_guard = utils::logging::replace_panic_hook_with_tracing_panic_hook();
let _sentry_guard = init_sentry(Some(GIT_VERSION.into()), &[]);
@@ -134,14 +134,14 @@ async fn main() -> anyhow::Result<()> {
proxy_listener,
cancellation_token.clone(),
));
let signals_task = tokio::spawn(proxy::handle_signals(cancellation_token));
let signals_task = tokio::spawn(proxy_core::handle_signals(cancellation_token));
// the signal task cant ever succeed.
// the main task can error, or can succeed on cancellation.
// we want to immediately exit on either of these cases
let signal = match futures::future::select(signals_task, main).await {
Either::Left((res, _)) => proxy::flatten_err(res)?,
Either::Right((res, _)) => return proxy::flatten_err(res),
Either::Left((res, _)) => proxy_core::flatten_err(res)?,
Either::Right((res, _)) => return proxy_core::flatten_err(res),
};
// maintenance tasks return `Infallible` success values, this is an impossible value
@@ -181,7 +181,7 @@ async fn task_main(
let ctx = RequestMonitoring::new(
session_id,
peer_addr.ip(),
proxy::metrics::Protocol::SniRouter,
proxy_core::metrics::Protocol::SniRouter,
"sni",
);
handle_client(ctx, dest_suffix, tls_config, tls_server_end_point, socket).await
@@ -250,7 +250,7 @@ async fn ssl_handshake<S: AsyncRead + AsyncWrite + Unpin>(
"unexpected startup packet, rejecting connection"
);
stream
.throw_error_str(ERR_INSECURE_CONNECTION, proxy::error::ErrorKind::User)
.throw_error_str(ERR_INSECURE_CONNECTION, proxy_core::error::ErrorKind::User)
.await?
}
}

129
proxy/proxy/Cargo.toml Normal file
View File

@@ -0,0 +1,129 @@
[package]
name = "proxy"
version = "0.1.0"
edition.workspace = true
license.workspace = true
[features]
default = []
testing = []
[dependencies]
proxy-sasl = { version = "0.1", path = "../sasl" }
proxy-core = { version = "0.1", path = "../core" }
# ahash.workspace = true
anyhow.workspace = true
# arc-swap.workspace = true
# async-compression.workspace = true
# async-trait.workspace = true
# atomic-take.workspace = true
aws-config.workspace = true
# aws-sdk-iam.workspace = true
# aws-sigv4.workspace = true
# aws-types.workspace = true
# base64.workspace = true
# bstr.workspace = true
# bytes = { workspace = true, features = ["serde"] }
# camino.workspace = true
# chrono.workspace = true
clap.workspace = true
# consumption_metrics.workspace = true
# crossbeam-deque.workspace = true
# dashmap.workspace = true
# env_logger.workspace = true
# framed-websockets.workspace = true
futures.workspace = true
git-version.workspace = true
# hashbrown.workspace = true
# hashlink.workspace = true
# hex.workspace = true
# hmac.workspace = true
# hostname.workspace = true
# http.workspace = true
humantime.workspace = true
# humantime-serde.workspace = true
# hyper.workspace = true
# hyper1 = { package = "hyper", version = "1.2", features = ["server"] }
# hyper-util = { version = "0.1", features = ["server", "http1", "http2", "tokio"] }
# http-body-util = { version = "0.1" }
# indexmap.workspace = true
# ipnet.workspace = true
itertools.workspace = true
# lasso = { workspace = true, features = ["multi-threaded"] }
# md5.workspace = true
# measured = { workspace = true, features = ["lasso"] }
metrics.workspace = true
# once_cell.workspace = true
# opentelemetry.workspace = true
# parking_lot.workspace = true
# parquet.workspace = true
# parquet_derive.workspace = true
# pin-project-lite.workspace = true
# postgres_backend.workspace = true
pq_proto.workspace = true
# # prometheus.workspace = true
# rand.workspace = true
# regex.workspace = true
remote_storage = { version = "0.1", path = "../../libs/remote_storage/" }
# reqwest.workspace = true
# reqwest-middleware = { workspace = true, features = ["json"] }
# reqwest-retry.workspace = true
# reqwest-tracing.workspace = true
# routerify.workspace = true
# rustc-hash.workspace = true
rustls-pemfile.workspace = true
rustls.workspace = true
# scopeguard.workspace = true
# serde.workspace = true
# serde_json.workspace = true
# sha2 = { workspace = true, features = ["asm", "oid"] }
# smol_str.workspace = true
# smallvec.workspace = true
socket2.workspace = true
# subtle.workspace = true
# task-local-extensions.workspace = true
# thiserror.workspace = true
tikv-jemallocator.workspace = true
# tikv-jemalloc-ctl = { workspace = true, features = ["use_std"] }
# tokio-postgres.workspace = true
# tokio-postgres-rustls.workspace = true
# tokio-rustls.workspace = true
tokio-util.workspace = true
tokio = { workspace = true, features = ["signal"] }
# tower-service.workspace = true
# tracing-opentelemetry.workspace = true
# tracing-subscriber.workspace = true
tracing-utils.workspace = true
tracing.workspace = true
# try-lock.workspace = true
# typed-json.workspace = true
# url.workspace = true
# urlencoding.workspace = true
utils.workspace = true
uuid.workspace = true
# rustls-native-certs.workspace = true
# x509-parser.workspace = true
# postgres-protocol.workspace = true
# redis.workspace = true
# # jwt stuff
# jose-jwa = "0.1.2"
# jose-jwk = { version = "0.1.2", features = ["p256", "p384", "rsa"] }
# signature = "2"
# ecdsa = "0.16"
# p256 = "0.13"
# rsa = "0.9"
# workspace_hack.workspace = true
# [dev-dependencies]
# camino-tempfile.workspace = true
# fallible-iterator.workspace = true
# tokio-tungstenite.workspace = true
# pbkdf2 = { workspace = true, features = ["simple", "std"] }
# rcgen.workspace = true
# rstest.workspace = true
# tokio-postgres-rustls.workspace = true
# walkdir.workspace = true
# rand_distr = "0.4"

View File

@@ -7,36 +7,36 @@ use aws_config::provider_config::ProviderConfig;
use aws_config::web_identity_token::WebIdentityTokenCredentialsProvider;
use aws_config::Region;
use futures::future::Either;
use proxy::auth;
use proxy::auth::backend::AuthRateLimiter;
use proxy::auth::backend::MaybeOwned;
use proxy::cancellation::CancelMap;
use proxy::cancellation::CancellationHandler;
use proxy::config::remote_storage_from_toml;
use proxy::config::AuthenticationConfig;
use proxy::config::CacheOptions;
use proxy::config::HttpConfig;
use proxy::config::ProjectInfoCacheOptions;
use proxy::console;
use proxy::context::parquet::ParquetUploadArgs;
use proxy::http;
use proxy::http::health_server::AppMetrics;
use proxy::metrics::Metrics;
use proxy::rate_limiter::EndpointRateLimiter;
use proxy::rate_limiter::LeakyBucketConfig;
use proxy::rate_limiter::RateBucketInfo;
use proxy::rate_limiter::WakeComputeRateLimiter;
use proxy::redis::cancellation_publisher::RedisPublisherClient;
use proxy::redis::connection_with_credentials_provider::ConnectionWithCredentialsProvider;
use proxy::redis::elasticache;
use proxy::redis::notifications;
use proxy::serverless::cancel_set::CancelSet;
use proxy::serverless::GlobalConnPoolOptions;
use proxy::usage_metrics;
use proxy_core::auth;
use proxy_core::auth::backend::AuthRateLimiter;
use proxy_core::auth::backend::MaybeOwned;
use proxy_core::cancellation::CancelMap;
use proxy_core::cancellation::CancellationHandler;
use proxy_core::config::remote_storage_from_toml;
use proxy_core::config::AuthenticationConfig;
use proxy_core::config::CacheOptions;
use proxy_core::config::HttpConfig;
use proxy_core::config::ProjectInfoCacheOptions;
use proxy_core::console;
use proxy_core::context::parquet::ParquetUploadArgs;
use proxy_core::http;
use proxy_core::http::health_server::AppMetrics;
use proxy_core::metrics::Metrics;
use proxy_core::rate_limiter::EndpointRateLimiter;
use proxy_core::rate_limiter::LeakyBucketConfig;
use proxy_core::rate_limiter::RateBucketInfo;
use proxy_core::rate_limiter::WakeComputeRateLimiter;
use proxy_core::redis::cancellation_publisher::RedisPublisherClient;
use proxy_core::redis::connection_with_credentials_provider::ConnectionWithCredentialsProvider;
use proxy_core::redis::elasticache;
use proxy_core::redis::notifications;
use proxy_core::serverless::cancel_set::CancelSet;
use proxy_core::serverless::GlobalConnPoolOptions;
use proxy_core::usage_metrics;
use anyhow::bail;
use proxy::config::{self, ProxyConfig};
use proxy::serverless;
use proxy_core::config::{self, ProxyConfig};
use proxy_core::serverless;
use proxy_sasl::scram::threadpool::ThreadPool;
use remote_storage::RemoteStorageConfig;
use std::net::SocketAddr;
@@ -268,7 +268,7 @@ struct SqlOverHttpArgs {
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let _logging_guard = proxy::logging::init().await?;
let _logging_guard = proxy_core::logging::init().await?;
let _panic_hook_guard = utils::logging::replace_panic_hook_with_tracing_panic_hook();
let _sentry_guard = init_sentry(Some(GIT_VERSION.into()), &[]);
@@ -279,7 +279,7 @@ async fn main() -> anyhow::Result<()> {
build_tag: BUILD_TAG,
});
let jemalloc = match proxy::jemalloc::MetricRecorder::new() {
let jemalloc = match proxy_core::jemalloc::MetricRecorder::new() {
Ok(t) => Some(t),
Err(e) => {
tracing::error!(error = ?e, "could not start jemalloc metrics loop");
@@ -394,7 +394,7 @@ async fn main() -> anyhow::Result<()> {
>::new(
cancel_map.clone(),
redis_publisher,
proxy::metrics::CancellationSource::FromClient,
proxy_core::metrics::CancellationSource::FromClient,
));
// bit of a hack - find the min rps and max rps supported and turn it into
@@ -419,7 +419,7 @@ async fn main() -> anyhow::Result<()> {
// client facing tasks. these will exit on error or on cancellation
// cancellation returns Ok(())
let mut client_tasks = JoinSet::new();
client_tasks.spawn(proxy::proxy::task_main(
client_tasks.spawn(proxy_core::proxy::task_main(
config,
proxy_listener,
cancellation_token.clone(),
@@ -443,20 +443,20 @@ async fn main() -> anyhow::Result<()> {
));
}
client_tasks.spawn(proxy::context::parquet::worker(
client_tasks.spawn(proxy_core::context::parquet::worker(
cancellation_token.clone(),
args.parquet_upload,
));
// maintenance tasks. these never return unless there's an error
let mut maintenance_tasks = JoinSet::new();
maintenance_tasks.spawn(proxy::handle_signals(cancellation_token.clone()));
maintenance_tasks.spawn(proxy_core::handle_signals(cancellation_token.clone()));
maintenance_tasks.spawn(http::health_server::task_main(
http_listener,
AppMetrics {
jemalloc,
neon_metrics,
proxy: proxy::metrics::Metrics::get(),
proxy: proxy_core::metrics::Metrics::get(),
},
));
maintenance_tasks.spawn(console::mgmt::task_main(mgmt_listener));
@@ -471,7 +471,7 @@ async fn main() -> anyhow::Result<()> {
}
if let auth::BackendType::Console(api, _) = &config.auth_backend {
if let proxy::console::provider::ConsoleBackend::Console(api) = &**api {
if let proxy_core::console::provider::ConsoleBackend::Console(api) = &**api {
match (redis_notifications_client, regional_redis_client.clone()) {
(None, None) => {}
(client1, client2) => {
@@ -516,11 +516,11 @@ async fn main() -> anyhow::Result<()> {
.await
{
// exit immediately on maintenance task completion
Either::Left((Some(res), _)) => break proxy::flatten_err(res)?,
Either::Left((Some(res), _)) => break proxy_core::flatten_err(res)?,
// exit with error immediately if all maintenance tasks have ceased (should be caught by branch above)
Either::Left((None, _)) => bail!("no maintenance tasks running. invalid state"),
// exit immediately on client task error
Either::Right((Some(res), _)) => proxy::flatten_err(res)?,
Either::Right((Some(res), _)) => proxy_core::flatten_err(res)?,
// exit if all our client tasks have shutdown gracefully
Either::Right((None, _)) => return Ok(()),
}
@@ -707,7 +707,7 @@ mod tests {
use std::time::Duration;
use clap::Parser;
use proxy::rate_limiter::RateBucketInfo;
use proxy_core::rate_limiter::RateBucketInfo;
#[test]
fn parse_endpoint_rps_limit() {