diff --git a/proxy/src/bin/local_proxy.rs b/proxy/src/bin/local_proxy.rs index 8d8a4c124a..8f225dc1e0 100644 --- a/proxy/src/bin/local_proxy.rs +++ b/proxy/src/bin/local_proxy.rs @@ -1,416 +1,7 @@ -use std::net::SocketAddr; -use std::pin::pin; -use std::str::FromStr; -use std::sync::Arc; -use std::time::Duration; - -use anyhow::{bail, ensure, Context}; -use camino::{Utf8Path, Utf8PathBuf}; -use compute_api::spec::LocalProxySpec; -use futures::future::Either; -use proxy::auth::backend::jwt::JwkCache; -use proxy::auth::backend::local::{LocalBackend, JWKS_ROLE_MAP}; -use proxy::auth::{self}; -use proxy::cancellation::CancellationHandler; -use proxy::config::{ - self, AuthenticationConfig, ComputeConfig, HttpConfig, ProxyConfig, RetryConfig, -}; -use proxy::control_plane::locks::ApiLocks; -use proxy::control_plane::messages::{EndpointJwksResponse, JwksSettings}; -use proxy::http::health_server::AppMetrics; -use proxy::intern::RoleNameInt; -use proxy::metrics::{Metrics, ThreadPoolMetrics}; -use proxy::rate_limiter::{ - BucketRateLimiter, EndpointRateLimiter, LeakyBucketConfig, RateBucketInfo, -}; -use proxy::scram::threadpool::ThreadPool; -use proxy::serverless::cancel_set::CancelSet; -use proxy::serverless::{self, GlobalConnPoolOptions}; -use proxy::tls::client_config::compute_client_config_with_root_certs; -use proxy::types::RoleName; -use proxy::url::ApiUrl; - -project_git_version!(GIT_VERSION); -project_build_tag!(BUILD_TAG); - -use clap::Parser; -use thiserror::Error; -use tokio::net::TcpListener; -use tokio::sync::Notify; -use tokio::task::JoinSet; -use tokio_util::sync::CancellationToken; -use tracing::{debug, error, info, warn}; -use utils::sentry_init::init_sentry; -use utils::{pid_file, project_build_tag, project_git_version}; - #[global_allocator] static GLOBAL: tikv_jemallocator::Jemalloc = tikv_jemallocator::Jemalloc; -/// Neon proxy/router -#[derive(Parser)] -#[command(version = GIT_VERSION, about)] -struct LocalProxyCliArgs { - /// listen for incoming metrics connections on ip:port - #[clap(long, default_value = "127.0.0.1:7001")] - metrics: String, - /// listen for incoming http connections on ip:port - #[clap(long)] - http: String, - /// timeout for the TLS handshake - #[clap(long, default_value = "15s", value_parser = humantime::parse_duration)] - handshake_timeout: tokio::time::Duration, - /// lock for `connect_compute` api method. example: "shards=32,permits=4,epoch=10m,timeout=1s". (use `permits=0` to disable). - #[clap(long, default_value = config::ConcurrencyLockOptions::DEFAULT_OPTIONS_CONNECT_COMPUTE_LOCK)] - connect_compute_lock: String, - #[clap(flatten)] - sql_over_http: SqlOverHttpArgs, - /// User rate limiter max number of requests per second. - /// - /// Provided in the form `@`. - /// Can be given multiple times for different bucket sizes. - #[clap(long, default_values_t = RateBucketInfo::DEFAULT_ENDPOINT_SET)] - user_rps_limit: Vec, - /// Whether the auth rate limiter actually takes effect (for testing) - #[clap(long, default_value_t = false, value_parser = clap::builder::BoolishValueParser::new(), action = clap::ArgAction::Set)] - auth_rate_limit_enabled: bool, - /// Authentication rate limiter max number of hashes per second. - #[clap(long, default_values_t = RateBucketInfo::DEFAULT_AUTH_SET)] - auth_rate_limit: Vec, - /// The IP subnet to use when considering whether two IP addresses are considered the same. - #[clap(long, default_value_t = 64)] - auth_rate_limit_ip_subnet: u8, - /// Whether to retry the connection to the compute node - #[clap(long, default_value = config::RetryConfig::CONNECT_TO_COMPUTE_DEFAULT_VALUES)] - connect_to_compute_retry: String, - /// Address of the postgres server - #[clap(long, default_value = "127.0.0.1:5432")] - postgres: SocketAddr, - /// Address of the internal compute-ctl api service - #[clap(long, default_value = "http://127.0.0.1:3081/")] - compute_ctl: ApiUrl, - /// Path of the local proxy config file - #[clap(long, default_value = "./local_proxy.json")] - config_path: Utf8PathBuf, - /// Path of the local proxy PID file - #[clap(long, default_value = "./local_proxy.pid")] - pid_path: Utf8PathBuf, -} - -#[derive(clap::Args, Clone, Copy, Debug)] -struct SqlOverHttpArgs { - /// How many connections to pool for each endpoint. Excess connections are discarded - #[clap(long, default_value_t = 200)] - sql_over_http_pool_max_total_conns: usize, - - /// How long pooled connections should remain idle for before closing - #[clap(long, default_value = "5m", value_parser = humantime::parse_duration)] - sql_over_http_idle_timeout: tokio::time::Duration, - - #[clap(long, default_value_t = 100)] - sql_over_http_client_conn_threshold: u64, - - #[clap(long, default_value_t = 16)] - sql_over_http_cancel_set_shards: usize, - - #[clap(long, default_value_t = 10 * 1024 * 1024)] // 10 MiB - sql_over_http_max_request_size_bytes: usize, - - #[clap(long, default_value_t = 10 * 1024 * 1024)] // 10 MiB - sql_over_http_max_response_size_bytes: usize, -} - #[tokio::main] async fn main() -> anyhow::Result<()> { - let _logging_guard = proxy::logging::init_local_proxy()?; - let _panic_hook_guard = utils::logging::replace_panic_hook_with_tracing_panic_hook(); - let _sentry_guard = init_sentry(Some(GIT_VERSION.into()), &[]); - - Metrics::install(Arc::new(ThreadPoolMetrics::new(0))); - - // TODO: refactor these to use labels - debug!("Version: {GIT_VERSION}"); - debug!("Build_tag: {BUILD_TAG}"); - let neon_metrics = ::metrics::NeonMetrics::new(::metrics::BuildInfo { - revision: GIT_VERSION, - build_tag: BUILD_TAG, - }); - - let jemalloc = match proxy::jemalloc::MetricRecorder::new() { - Ok(t) => Some(t), - Err(e) => { - tracing::error!(error = ?e, "could not start jemalloc metrics loop"); - None - } - }; - - let args = LocalProxyCliArgs::parse(); - let config = build_config(&args)?; - let auth_backend = build_auth_backend(&args)?; - - // before we bind to any ports, write the process ID to a file - // so that compute-ctl can find our process later - // in order to trigger the appropriate SIGHUP on config change. - // - // This also claims a "lock" that makes sure only one instance - // of local_proxy runs at a time. - let _process_guard = loop { - match pid_file::claim_for_current_process(&args.pid_path) { - Ok(guard) => break guard, - Err(e) => { - // compute-ctl might have tried to read the pid-file to let us - // know about some config change. We should try again. - error!(path=?args.pid_path, "could not claim PID file guard: {e:?}"); - tokio::time::sleep(Duration::from_secs(1)).await; - } - } - }; - - let metrics_listener = TcpListener::bind(args.metrics).await?.into_std()?; - let http_listener = TcpListener::bind(args.http).await?; - let shutdown = CancellationToken::new(); - - // todo: should scale with CU - let endpoint_rate_limiter = Arc::new(EndpointRateLimiter::new_with_shards( - LeakyBucketConfig { - rps: 10.0, - max: 100.0, - }, - 16, - )); - - let mut maintenance_tasks = JoinSet::new(); - - let refresh_config_notify = Arc::new(Notify::new()); - maintenance_tasks.spawn(proxy::signals::handle(shutdown.clone(), { - let refresh_config_notify = Arc::clone(&refresh_config_notify); - move || { - refresh_config_notify.notify_one(); - } - })); - - // trigger the first config load **after** setting up the signal hook - // to avoid the race condition where: - // 1. No config file registered when local_proxy starts up - // 2. The config file is written but the signal hook is not yet received - // 3. local_proxy completes startup but has no config loaded, despite there being a registerd config. - refresh_config_notify.notify_one(); - tokio::spawn(refresh_config_loop(args.config_path, refresh_config_notify)); - - maintenance_tasks.spawn(proxy::http::health_server::task_main( - metrics_listener, - AppMetrics { - jemalloc, - neon_metrics, - proxy: proxy::metrics::Metrics::get(), - }, - )); - - let task = serverless::task_main( - config, - auth_backend, - http_listener, - shutdown.clone(), - Arc::new(CancellationHandler::new(&config.connect_to_compute, None)), - endpoint_rate_limiter, - ); - - match futures::future::select(pin!(maintenance_tasks.join_next()), pin!(task)).await { - // exit immediately on maintenance task completion - Either::Left((Some(res), _)) => match proxy::error::flatten_err(res)? {}, - // exit with error immediately if all maintenance tasks have ceased (should be caught by branch above) - Either::Left((None, _)) => bail!("no maintenance tasks running. invalid state"), - // exit immediately on client task error - Either::Right((res, _)) => res?, - } - - Ok(()) -} - -/// ProxyConfig is created at proxy startup, and lives forever. -fn build_config(args: &LocalProxyCliArgs) -> anyhow::Result<&'static ProxyConfig> { - let config::ConcurrencyLockOptions { - shards, - limiter, - epoch, - timeout, - } = args.connect_compute_lock.parse()?; - info!( - ?limiter, - shards, - ?epoch, - "Using NodeLocks (connect_compute)" - ); - let connect_compute_locks = ApiLocks::new( - "connect_compute_lock", - limiter, - shards, - timeout, - epoch, - &Metrics::get().proxy.connect_compute_lock, - )?; - - let http_config = HttpConfig { - accept_websockets: false, - pool_options: GlobalConnPoolOptions { - gc_epoch: Duration::from_secs(60), - pool_shards: 2, - idle_timeout: args.sql_over_http.sql_over_http_idle_timeout, - opt_in: false, - - max_conns_per_endpoint: args.sql_over_http.sql_over_http_pool_max_total_conns, - max_total_conns: args.sql_over_http.sql_over_http_pool_max_total_conns, - }, - cancel_set: CancelSet::new(args.sql_over_http.sql_over_http_cancel_set_shards), - client_conn_threshold: args.sql_over_http.sql_over_http_client_conn_threshold, - max_request_size_bytes: args.sql_over_http.sql_over_http_max_request_size_bytes, - max_response_size_bytes: args.sql_over_http.sql_over_http_max_response_size_bytes, - }; - - let compute_config = ComputeConfig { - retry: RetryConfig::parse(RetryConfig::CONNECT_TO_COMPUTE_DEFAULT_VALUES)?, - tls: Arc::new(compute_client_config_with_root_certs()?), - timeout: Duration::from_secs(2), - }; - - Ok(Box::leak(Box::new(ProxyConfig { - tls_config: None, - metric_collection: None, - http_config, - authentication_config: AuthenticationConfig { - jwks_cache: JwkCache::default(), - thread_pool: ThreadPool::new(0), - scram_protocol_timeout: Duration::from_secs(10), - rate_limiter_enabled: false, - rate_limiter: BucketRateLimiter::new(vec![]), - rate_limit_ip_subnet: 64, - ip_allowlist_check_enabled: true, - is_vpc_acccess_proxy: false, - is_auth_broker: false, - accept_jwts: true, - console_redirect_confirmation_timeout: Duration::ZERO, - }, - proxy_protocol_v2: config::ProxyProtocolV2::Rejected, - handshake_timeout: Duration::from_secs(10), - region: "local".into(), - wake_compute_retry_config: RetryConfig::parse(RetryConfig::WAKE_COMPUTE_DEFAULT_VALUES)?, - connect_compute_locks, - connect_to_compute: compute_config, - }))) -} - -/// auth::Backend is created at proxy startup, and lives forever. -fn build_auth_backend( - args: &LocalProxyCliArgs, -) -> anyhow::Result<&'static auth::Backend<'static, ()>> { - let auth_backend = proxy::auth::Backend::Local(proxy::auth::backend::MaybeOwned::Owned( - LocalBackend::new(args.postgres, args.compute_ctl.clone()), - )); - - Ok(Box::leak(Box::new(auth_backend))) -} - -#[derive(Error, Debug)] -enum RefreshConfigError { - #[error(transparent)] - Read(#[from] std::io::Error), - #[error(transparent)] - Parse(#[from] serde_json::Error), - #[error(transparent)] - Validate(anyhow::Error), -} - -async fn refresh_config_loop(path: Utf8PathBuf, rx: Arc) { - let mut init = true; - loop { - rx.notified().await; - - match refresh_config_inner(&path).await { - Ok(()) => {} - // don't log for file not found errors if this is the first time we are checking - // for computes that don't use local_proxy, this is not an error. - Err(RefreshConfigError::Read(e)) - if init && e.kind() == std::io::ErrorKind::NotFound => - { - debug!(error=?e, ?path, "could not read config file"); - } - Err(e) => { - error!(error=?e, ?path, "could not read config file"); - } - } - - init = false; - } -} - -async fn refresh_config_inner(path: &Utf8Path) -> Result<(), RefreshConfigError> { - let bytes = tokio::fs::read(&path).await?; - let data: LocalProxySpec = serde_json::from_slice(&bytes)?; - - let mut jwks_set = vec![]; - - fn parse_jwks_settings(jwks: compute_api::spec::JwksSettings) -> anyhow::Result { - let mut jwks_url = url::Url::from_str(&jwks.jwks_url).context("parsing JWKS url")?; - - ensure!( - jwks_url.has_authority() - && (jwks_url.scheme() == "http" || jwks_url.scheme() == "https"), - "Invalid JWKS url. Must be HTTP", - ); - - ensure!( - jwks_url.host().is_some_and(|h| h != url::Host::Domain("")), - "Invalid JWKS url. No domain listed", - ); - - // clear username, password and ports - jwks_url - .set_username("") - .expect("url can be a base and has a valid host and is not a file. should not error"); - jwks_url - .set_password(None) - .expect("url can be a base and has a valid host and is not a file. should not error"); - // local testing is hard if we need to have a specific restricted port - if cfg!(not(feature = "testing")) { - jwks_url.set_port(None).expect( - "url can be a base and has a valid host and is not a file. should not error", - ); - } - - // clear query params - jwks_url.set_fragment(None); - jwks_url.query_pairs_mut().clear().finish(); - - if jwks_url.scheme() != "https" { - // local testing is hard if we need to set up https support. - if cfg!(not(feature = "testing")) { - jwks_url - .set_scheme("https") - .expect("should not error to set the scheme to https if it was http"); - } else { - warn!(scheme = jwks_url.scheme(), "JWKS url is not HTTPS"); - } - } - - Ok(JwksSettings { - id: jwks.id, - jwks_url, - provider_name: jwks.provider_name, - jwt_audience: jwks.jwt_audience, - role_names: jwks - .role_names - .into_iter() - .map(RoleName::from) - .map(|s| RoleNameInt::from(&s)) - .collect(), - }) - } - - for jwks in data.jwks.into_iter().flatten() { - jwks_set.push(parse_jwks_settings(jwks).map_err(RefreshConfigError::Validate)?); - } - - info!("successfully loaded new config"); - JWKS_ROLE_MAP.store(Some(Arc::new(EndpointJwksResponse { jwks: jwks_set }))); - - Ok(()) + proxy::binary::local_proxy::run().await } diff --git a/proxy/src/bin/pg_sni_router.rs b/proxy/src/bin/pg_sni_router.rs index 97d870a83a..0c3326af85 100644 --- a/proxy/src/bin/pg_sni_router.rs +++ b/proxy/src/bin/pg_sni_router.rs @@ -1,299 +1,10 @@ -/// A stand-alone program that routes connections, e.g. from -/// `aaa--bbb--1234.external.domain` to `aaa.bbb.internal.domain:1234`. -/// -/// This allows connecting to pods/services running in the same Kubernetes cluster from -/// the outside. Similar to an ingress controller for HTTPS. -use std::{net::SocketAddr, sync::Arc}; - -use anyhow::{anyhow, bail, ensure, Context}; -use clap::Arg; -use futures::future::Either; -use futures::TryFutureExt; -use itertools::Itertools; -use proxy::context::RequestContext; -use proxy::metrics::{Metrics, ThreadPoolMetrics}; -use proxy::protocol2::ConnectionInfo; -use proxy::proxy::{copy_bidirectional_client_compute, run_until_cancelled, ErrorSource}; -use proxy::stream::{PqStream, Stream}; -use proxy::tls::TlsServerEndPoint; -use rustls::crypto::ring; -use rustls::pki_types::PrivateKeyDer; -use tokio::io::{AsyncRead, AsyncWrite}; -use tokio::net::TcpListener; -use tokio_util::sync::CancellationToken; -use tracing::{error, info, Instrument}; -use utils::project_git_version; -use utils::sentry_init::init_sentry; - -project_git_version!(GIT_VERSION); - -fn cli() -> clap::Command { - clap::Command::new("Neon proxy/router") - .version(GIT_VERSION) - .arg( - Arg::new("listen") - .short('l') - .long("listen") - .help("listen for incoming client connections on ip:port") - .default_value("127.0.0.1:4432"), - ) - .arg( - Arg::new("tls-key") - .short('k') - .long("tls-key") - .help("path to TLS key for client postgres connections") - .required(true), - ) - .arg( - Arg::new("tls-cert") - .short('c') - .long("tls-cert") - .help("path to TLS cert for client postgres connections") - .required(true), - ) - .arg( - Arg::new("dest") - .short('d') - .long("destination") - .help("append this domain zone to the SNI hostname to get the destination address") - .required(true), - ) -} +//! A stand-alone program that routes connections, e.g. from +//! `aaa--bbb--1234.external.domain` to `aaa.bbb.internal.domain:1234`. +//! +//! This allows connecting to pods/services running in the same Kubernetes cluster from +//! the outside. Similar to an ingress controller for HTTPS. #[tokio::main] async fn main() -> anyhow::Result<()> { - let _logging_guard = proxy::logging::init().await?; - let _panic_hook_guard = utils::logging::replace_panic_hook_with_tracing_panic_hook(); - let _sentry_guard = init_sentry(Some(GIT_VERSION.into()), &[]); - - Metrics::install(Arc::new(ThreadPoolMetrics::new(0))); - - let args = cli().get_matches(); - let destination: String = args.get_one::("dest").unwrap().parse()?; - - // Configure TLS - let (tls_config, tls_server_end_point): (Arc, TlsServerEndPoint) = match ( - args.get_one::("tls-key"), - args.get_one::("tls-cert"), - ) { - (Some(key_path), Some(cert_path)) => { - let key = { - let key_bytes = std::fs::read(key_path).context("TLS key file")?; - - let mut keys = - rustls_pemfile::pkcs8_private_keys(&mut &key_bytes[..]).collect_vec(); - - ensure!(keys.len() == 1, "keys.len() = {} (should be 1)", keys.len()); - PrivateKeyDer::Pkcs8( - keys.pop() - .unwrap() - .context(format!("Failed to read TLS keys at '{key_path}'"))?, - ) - }; - - let cert_chain_bytes = std::fs::read(cert_path) - .context(format!("Failed to read TLS cert file at '{cert_path}.'"))?; - - let cert_chain: Vec<_> = { - rustls_pemfile::certs(&mut &cert_chain_bytes[..]) - .try_collect() - .with_context(|| { - format!("Failed to read TLS certificate chain from bytes from file at '{cert_path}'.") - })? - }; - - // needed for channel bindings - let first_cert = cert_chain.first().context("missing certificate")?; - let tls_server_end_point = TlsServerEndPoint::new(first_cert)?; - - let tls_config = - rustls::ServerConfig::builder_with_provider(Arc::new(ring::default_provider())) - .with_protocol_versions(&[&rustls::version::TLS13, &rustls::version::TLS12]) - .context("ring should support TLS1.2 and TLS1.3")? - .with_no_client_auth() - .with_single_cert(cert_chain, key)? - .into(); - - (tls_config, tls_server_end_point) - } - _ => bail!("tls-key and tls-cert must be specified"), - }; - - // Start listening for incoming client connections - let proxy_address: SocketAddr = args.get_one::("listen").unwrap().parse()?; - info!("Starting sni router on {proxy_address}"); - let proxy_listener = TcpListener::bind(proxy_address).await?; - - let cancellation_token = CancellationToken::new(); - - let main = tokio::spawn(task_main( - Arc::new(destination), - tls_config, - tls_server_end_point, - proxy_listener, - cancellation_token.clone(), - )); - let signals_task = tokio::spawn(proxy::signals::handle(cancellation_token, || {})); - - // the signal task cant ever succeed. - // the main task can error, or can succeed on cancellation. - // we want to immediately exit on either of these cases - let signal = match futures::future::select(signals_task, main).await { - Either::Left((res, _)) => proxy::error::flatten_err(res)?, - Either::Right((res, _)) => return proxy::error::flatten_err(res), - }; - - // maintenance tasks return `Infallible` success values, this is an impossible value - // so this match statically ensures that there are no possibilities for that value - match signal {} -} - -async fn task_main( - dest_suffix: Arc, - tls_config: Arc, - tls_server_end_point: TlsServerEndPoint, - listener: tokio::net::TcpListener, - cancellation_token: CancellationToken, -) -> anyhow::Result<()> { - // When set for the server socket, the keepalive setting - // will be inherited by all accepted client sockets. - socket2::SockRef::from(&listener).set_keepalive(true)?; - - let connections = tokio_util::task::task_tracker::TaskTracker::new(); - - while let Some(accept_result) = - run_until_cancelled(listener.accept(), &cancellation_token).await - { - let (socket, peer_addr) = accept_result?; - - let session_id = uuid::Uuid::new_v4(); - let tls_config = Arc::clone(&tls_config); - let dest_suffix = Arc::clone(&dest_suffix); - - connections.spawn( - async move { - socket - .set_nodelay(true) - .context("failed to set socket option")?; - - info!(%peer_addr, "serving"); - let ctx = RequestContext::new( - session_id, - ConnectionInfo { - addr: peer_addr, - extra: None, - }, - proxy::metrics::Protocol::SniRouter, - "sni", - ); - handle_client(ctx, dest_suffix, tls_config, tls_server_end_point, socket).await - } - .unwrap_or_else(|e| { - // Acknowledge that the task has finished with an error. - error!("per-client task finished with an error: {e:#}"); - }) - .instrument(tracing::info_span!("handle_client", ?session_id)), - ); - } - - connections.close(); - drop(listener); - - connections.wait().await; - - info!("all client connections have finished"); - Ok(()) -} - -const ERR_INSECURE_CONNECTION: &str = "connection is insecure (try using `sslmode=require`)"; - -async fn ssl_handshake( - ctx: &RequestContext, - raw_stream: S, - tls_config: Arc, - tls_server_end_point: TlsServerEndPoint, -) -> anyhow::Result> { - let mut stream = PqStream::new(Stream::from_raw(raw_stream)); - - let msg = stream.read_startup_packet().await?; - use pq_proto::FeStartupPacket::*; - - match msg { - SslRequest { direct: false } => { - stream - .write_message(&pq_proto::BeMessage::EncryptionResponse(true)) - .await?; - - // Upgrade raw stream into a secure TLS-backed stream. - // NOTE: We've consumed `tls`; this fact will be used later. - - let (raw, read_buf) = stream.into_inner(); - // TODO: Normally, client doesn't send any data before - // server says TLS handshake is ok and read_buf is empty. - // However, you could imagine pipelining of postgres - // SSLRequest + TLS ClientHello in one hunk similar to - // pipelining in our node js driver. We should probably - // support that by chaining read_buf with the stream. - if !read_buf.is_empty() { - bail!("data is sent before server replied with EncryptionResponse"); - } - - Ok(Stream::Tls { - tls: Box::new( - raw.upgrade(tls_config, !ctx.has_private_peer_addr()) - .await?, - ), - tls_server_end_point, - }) - } - unexpected => { - info!( - ?unexpected, - "unexpected startup packet, rejecting connection" - ); - stream - .throw_error_str(ERR_INSECURE_CONNECTION, proxy::error::ErrorKind::User) - .await? - } - } -} - -async fn handle_client( - ctx: RequestContext, - dest_suffix: Arc, - tls_config: Arc, - tls_server_end_point: TlsServerEndPoint, - stream: impl AsyncRead + AsyncWrite + Unpin, -) -> anyhow::Result<()> { - let mut tls_stream = ssl_handshake(&ctx, stream, tls_config, tls_server_end_point).await?; - - // Cut off first part of the SNI domain - // We receive required destination details in the format of - // `{k8s_service_name}--{k8s_namespace}--{port}.non-sni-domain` - let sni = tls_stream.sni_hostname().ok_or(anyhow!("SNI missing"))?; - let dest: Vec<&str> = sni - .split_once('.') - .context("invalid SNI")? - .0 - .splitn(3, "--") - .collect(); - let port = dest[2].parse::().context("invalid port")?; - let destination = format!("{}.{}.{}:{}", dest[0], dest[1], dest_suffix, port); - - info!("destination: {}", destination); - - let mut client = tokio::net::TcpStream::connect(destination).await?; - - // doesn't yet matter as pg-sni-router doesn't report analytics logs - ctx.set_success(); - ctx.log_connect(); - - // Starting from here we only proxy the client's traffic. - info!("performing the proxy pass..."); - - match copy_bidirectional_client_compute(&mut tls_stream, &mut client).await { - Ok(_) => Ok(()), - Err(ErrorSource::Client(err)) => Err(err).context("client"), - Err(ErrorSource::Compute(err)) => Err(err).context("compute"), - } + proxy::binary::pg_sni_router::run().await } diff --git a/proxy/src/bin/proxy.rs b/proxy/src/bin/proxy.rs index de685a82c6..7d4b44841d 100644 --- a/proxy/src/bin/proxy.rs +++ b/proxy/src/bin/proxy.rs @@ -1,831 +1,7 @@ -use std::net::SocketAddr; -use std::pin::pin; -use std::sync::Arc; -use std::time::Duration; - -use anyhow::bail; -use futures::future::Either; -use proxy::auth::backend::jwt::JwkCache; -use proxy::auth::backend::{AuthRateLimiter, ConsoleRedirectBackend, MaybeOwned}; -use proxy::cancellation::{handle_cancel_messages, CancellationHandler}; -use proxy::config::{ - self, remote_storage_from_toml, AuthenticationConfig, CacheOptions, ComputeConfig, HttpConfig, - ProjectInfoCacheOptions, ProxyConfig, ProxyProtocolV2, -}; -use proxy::context::parquet::ParquetUploadArgs; -use proxy::http::health_server::AppMetrics; -use proxy::metrics::Metrics; -use proxy::rate_limiter::{ - EndpointRateLimiter, LeakyBucketConfig, RateBucketInfo, WakeComputeRateLimiter, -}; -use proxy::redis::connection_with_credentials_provider::ConnectionWithCredentialsProvider; -use proxy::redis::kv_ops::RedisKVClient; -use proxy::redis::{elasticache, notifications}; -use proxy::scram::threadpool::ThreadPool; -use proxy::serverless::cancel_set::CancelSet; -use proxy::serverless::GlobalConnPoolOptions; -use proxy::tls::client_config::compute_client_config_with_root_certs; -use proxy::{auth, control_plane, http, serverless, usage_metrics}; -use remote_storage::RemoteStorageConfig; -use tokio::net::TcpListener; -use tokio::task::JoinSet; -use tokio_util::sync::CancellationToken; -use tracing::{info, warn, Instrument}; -use utils::sentry_init::init_sentry; -use utils::{project_build_tag, project_git_version}; - -project_git_version!(GIT_VERSION); -project_build_tag!(BUILD_TAG); - -use clap::{Parser, ValueEnum}; - #[global_allocator] static GLOBAL: tikv_jemallocator::Jemalloc = tikv_jemallocator::Jemalloc; -#[derive(Clone, Debug, ValueEnum)] -enum AuthBackendType { - #[value(name("cplane-v1"), alias("control-plane"))] - ControlPlaneV1, - - #[value(name("link"), alias("control-redirect"))] - ConsoleRedirect, - - #[cfg(feature = "testing")] - Postgres, -} - -/// Neon proxy/router -#[derive(Parser)] -#[command(version = GIT_VERSION, about)] -struct ProxyCliArgs { - /// Name of the region this proxy is deployed in - #[clap(long, default_value_t = String::new())] - region: String, - /// listen for incoming client connections on ip:port - #[clap(short, long, default_value = "127.0.0.1:4432")] - proxy: String, - #[clap(value_enum, long, default_value_t = AuthBackendType::ConsoleRedirect)] - auth_backend: AuthBackendType, - /// listen for management callback connection on ip:port - #[clap(short, long, default_value = "127.0.0.1:7000")] - mgmt: String, - /// listen for incoming http connections (metrics, etc) on ip:port - #[clap(long, default_value = "127.0.0.1:7001")] - http: String, - /// listen for incoming wss connections on ip:port - #[clap(long)] - wss: Option, - /// redirect unauthenticated users to the given uri in case of console redirect auth - #[clap(short, long, default_value = "http://localhost:3000/psql_session/")] - uri: String, - /// cloud API endpoint for authenticating users - #[clap( - short, - long, - default_value = "http://localhost:3000/authenticate_proxy_request/" - )] - auth_endpoint: String, - /// JWT used to connect to control plane. - #[clap( - long, - value_name = "JWT", - default_value = "", - env = "NEON_PROXY_TO_CONTROLPLANE_TOKEN" - )] - control_plane_token: Arc, - /// if this is not local proxy, this toggles whether we accept jwt or passwords for http - #[clap(long, default_value_t = false, value_parser = clap::builder::BoolishValueParser::new(), action = clap::ArgAction::Set)] - is_auth_broker: bool, - /// path to TLS key for client postgres connections - /// - /// tls-key and tls-cert are for backwards compatibility, we can put all certs in one dir - #[clap(short = 'k', long, alias = "ssl-key")] - tls_key: Option, - /// path to TLS cert for client postgres connections - /// - /// tls-key and tls-cert are for backwards compatibility, we can put all certs in one dir - #[clap(short = 'c', long, alias = "ssl-cert")] - tls_cert: Option, - /// Allow writing TLS session keys to the given file pointed to by the environment variable `SSLKEYLOGFILE`. - #[clap(long, alias = "allow-ssl-keylogfile")] - allow_tls_keylogfile: bool, - /// path to directory with TLS certificates for client postgres connections - #[clap(long)] - certs_dir: Option, - /// timeout for the TLS handshake - #[clap(long, default_value = "15s", value_parser = humantime::parse_duration)] - handshake_timeout: tokio::time::Duration, - /// http endpoint to receive periodic metric updates - #[clap(long)] - metric_collection_endpoint: Option, - /// how often metrics should be sent to a collection endpoint - #[clap(long)] - metric_collection_interval: Option, - /// cache for `wake_compute` api method (use `size=0` to disable) - #[clap(long, default_value = config::CacheOptions::CACHE_DEFAULT_OPTIONS)] - wake_compute_cache: String, - /// lock for `wake_compute` api method. example: "shards=32,permits=4,epoch=10m,timeout=1s". (use `permits=0` to disable). - #[clap(long, default_value = config::ConcurrencyLockOptions::DEFAULT_OPTIONS_WAKE_COMPUTE_LOCK)] - wake_compute_lock: String, - /// lock for `connect_compute` api method. example: "shards=32,permits=4,epoch=10m,timeout=1s". (use `permits=0` to disable). - #[clap(long, default_value = config::ConcurrencyLockOptions::DEFAULT_OPTIONS_CONNECT_COMPUTE_LOCK)] - connect_compute_lock: String, - #[clap(flatten)] - sql_over_http: SqlOverHttpArgs, - /// timeout for scram authentication protocol - #[clap(long, default_value = "15s", value_parser = humantime::parse_duration)] - scram_protocol_timeout: tokio::time::Duration, - /// size of the threadpool for password hashing - #[clap(long, default_value_t = 4)] - scram_thread_pool_size: u8, - /// Endpoint rate limiter max number of requests per second. - /// - /// Provided in the form `@`. - /// Can be given multiple times for different bucket sizes. - #[clap(long, default_values_t = RateBucketInfo::DEFAULT_ENDPOINT_SET)] - endpoint_rps_limit: Vec, - /// Wake compute rate limiter max number of requests per second. - #[clap(long, default_values_t = RateBucketInfo::DEFAULT_SET)] - wake_compute_limit: Vec, - /// Whether the auth rate limiter actually takes effect (for testing) - #[clap(long, default_value_t = false, value_parser = clap::builder::BoolishValueParser::new(), action = clap::ArgAction::Set)] - auth_rate_limit_enabled: bool, - /// Authentication rate limiter max number of hashes per second. - #[clap(long, default_values_t = RateBucketInfo::DEFAULT_AUTH_SET)] - auth_rate_limit: Vec, - /// The IP subnet to use when considering whether two IP addresses are considered the same. - #[clap(long, default_value_t = 64)] - auth_rate_limit_ip_subnet: u8, - /// Redis rate limiter max number of requests per second. - #[clap(long, default_values_t = RateBucketInfo::DEFAULT_REDIS_SET)] - redis_rps_limit: Vec, - /// Cancellation channel size (max queue size for redis kv client) - #[clap(long, default_value = "1024")] - cancellation_ch_size: usize, - /// cache for `allowed_ips` (use `size=0` to disable) - #[clap(long, default_value = config::CacheOptions::CACHE_DEFAULT_OPTIONS)] - allowed_ips_cache: String, - /// cache for `role_secret` (use `size=0` to disable) - #[clap(long, default_value = config::CacheOptions::CACHE_DEFAULT_OPTIONS)] - role_secret_cache: String, - /// redis url for notifications (if empty, redis_host:port will be used for both notifications and streaming connections) - #[clap(long)] - redis_notifications: Option, - /// what from the available authentications type to use for the regional redis we have. Supported are "irsa" and "plain". - #[clap(long, default_value = "irsa")] - redis_auth_type: String, - /// redis host for streaming connections (might be different from the notifications host) - #[clap(long)] - redis_host: Option, - /// redis port for streaming connections (might be different from the notifications host) - #[clap(long)] - redis_port: Option, - /// redis cluster name, used in aws elasticache - #[clap(long)] - redis_cluster_name: Option, - /// redis user_id, used in aws elasticache - #[clap(long)] - redis_user_id: Option, - /// aws region to retrieve credentials - #[clap(long, default_value_t = String::new())] - aws_region: String, - /// cache for `project_info` (use `size=0` to disable) - #[clap(long, default_value = config::ProjectInfoCacheOptions::CACHE_DEFAULT_OPTIONS)] - project_info_cache: String, - /// cache for all valid endpoints - #[clap(long, default_value = config::EndpointCacheConfig::CACHE_DEFAULT_OPTIONS)] - endpoint_cache_config: String, - #[clap(flatten)] - parquet_upload: ParquetUploadArgs, - - /// interval for backup metric collection - #[clap(long, default_value = "10m", value_parser = humantime::parse_duration)] - metric_backup_collection_interval: std::time::Duration, - /// remote storage configuration for backup metric collection - /// Encoded as toml (same format as pageservers), eg - /// `{bucket_name='the-bucket',bucket_region='us-east-1',prefix_in_bucket='proxy',endpoint='http://minio:9000'}` - #[clap(long, value_parser = remote_storage_from_toml)] - metric_backup_collection_remote_storage: Option, - /// chunk size for backup metric collection - /// Size of each event is no more than 400 bytes, so 2**22 is about 200MB before the compression. - #[clap(long, default_value = "4194304")] - metric_backup_collection_chunk_size: usize, - /// Whether to retry the connection to the compute node - #[clap(long, default_value = config::RetryConfig::CONNECT_TO_COMPUTE_DEFAULT_VALUES)] - connect_to_compute_retry: String, - /// Whether to retry the wake_compute request - #[clap(long, default_value = config::RetryConfig::WAKE_COMPUTE_DEFAULT_VALUES)] - wake_compute_retry: String, - - /// Configure if this is a private access proxy for the POC: In that case the proxy will ignore the IP allowlist - #[clap(long, default_value_t = false, value_parser = clap::builder::BoolishValueParser::new(), action = clap::ArgAction::Set)] - is_private_access_proxy: bool, - - /// Configure whether all incoming requests have a Proxy Protocol V2 packet. - // TODO(conradludgate): switch default to rejected or required once we've updated all deployments - #[clap(value_enum, long, default_value_t = ProxyProtocolV2::Supported)] - proxy_protocol_v2: ProxyProtocolV2, - - /// Time the proxy waits for the webauth session to be confirmed by the control plane. - // TODO: rename to `console_redirect_confirmation_timeout`. - #[clap(long, default_value = "2m", value_parser = humantime::parse_duration)] - webauth_confirmation_timeout: std::time::Duration, -} - -#[derive(clap::Args, Clone, Copy, Debug)] -struct SqlOverHttpArgs { - /// timeout for http connection requests - #[clap(long, default_value = "15s", value_parser = humantime::parse_duration)] - sql_over_http_timeout: tokio::time::Duration, - - /// Whether the SQL over http pool is opt-in - #[clap(long, default_value_t = true, value_parser = clap::builder::BoolishValueParser::new(), action = clap::ArgAction::Set)] - sql_over_http_pool_opt_in: bool, - - /// How many connections to pool for each endpoint. Excess connections are discarded - #[clap(long, default_value_t = 20)] - sql_over_http_pool_max_conns_per_endpoint: usize, - - /// How many connections to pool for each endpoint. Excess connections are discarded - #[clap(long, default_value_t = 20000)] - sql_over_http_pool_max_total_conns: usize, - - /// How long pooled connections should remain idle for before closing - #[clap(long, default_value = "5m", value_parser = humantime::parse_duration)] - sql_over_http_idle_timeout: tokio::time::Duration, - - /// Duration each shard will wait on average before a GC sweep. - /// A longer time will causes sweeps to take longer but will interfere less frequently. - #[clap(long, default_value = "10m", value_parser = humantime::parse_duration)] - sql_over_http_pool_gc_epoch: tokio::time::Duration, - - /// How many shards should the global pool have. Must be a power of two. - /// More shards will introduce less contention for pool operations, but can - /// increase memory used by the pool - #[clap(long, default_value_t = 128)] - sql_over_http_pool_shards: usize, - - #[clap(long, default_value_t = 10000)] - sql_over_http_client_conn_threshold: u64, - - #[clap(long, default_value_t = 64)] - sql_over_http_cancel_set_shards: usize, - - #[clap(long, default_value_t = 10 * 1024 * 1024)] // 10 MiB - sql_over_http_max_request_size_bytes: usize, - - #[clap(long, default_value_t = 10 * 1024 * 1024)] // 10 MiB - sql_over_http_max_response_size_bytes: usize, -} - #[tokio::main] async fn main() -> anyhow::Result<()> { - let _logging_guard = proxy::logging::init().await?; - let _panic_hook_guard = utils::logging::replace_panic_hook_with_tracing_panic_hook(); - let _sentry_guard = init_sentry(Some(GIT_VERSION.into()), &[]); - - // TODO: refactor these to use labels - info!("Version: {GIT_VERSION}"); - info!("Build_tag: {BUILD_TAG}"); - let neon_metrics = ::metrics::NeonMetrics::new(::metrics::BuildInfo { - revision: GIT_VERSION, - build_tag: BUILD_TAG, - }); - - let jemalloc = match proxy::jemalloc::MetricRecorder::new() { - Ok(t) => Some(t), - Err(e) => { - tracing::error!(error = ?e, "could not start jemalloc metrics loop"); - None - } - }; - - let args = ProxyCliArgs::parse(); - let config = build_config(&args)?; - let auth_backend = build_auth_backend(&args)?; - - match auth_backend { - Either::Left(auth_backend) => info!("Authentication backend: {auth_backend}"), - Either::Right(auth_backend) => info!("Authentication backend: {auth_backend:?}"), - }; - info!("Using region: {}", args.aws_region); - - // TODO: untangle the config args - let regional_redis_client = match (args.redis_auth_type.as_str(), &args.redis_notifications) { - ("plain", redis_url) => match redis_url { - None => { - bail!("plain auth requires redis_notifications to be set"); - } - Some(url) => Some( - ConnectionWithCredentialsProvider::new_with_static_credentials(url.to_string()), - ), - }, - ("irsa", _) => match (&args.redis_host, args.redis_port) { - (Some(host), Some(port)) => Some( - ConnectionWithCredentialsProvider::new_with_credentials_provider( - host.to_string(), - port, - elasticache::CredentialsProvider::new( - args.aws_region, - args.redis_cluster_name, - args.redis_user_id, - ) - .await, - ), - ), - (None, None) => { - warn!("irsa auth requires redis-host and redis-port to be set, continuing without regional_redis_client"); - None - } - _ => { - bail!("redis-host and redis-port must be specified together"); - } - }, - _ => { - bail!("unknown auth type given"); - } - }; - - let redis_notifications_client = if let Some(url) = args.redis_notifications { - Some(ConnectionWithCredentialsProvider::new_with_static_credentials(url.to_string())) - } else { - regional_redis_client.clone() - }; - - // Check that we can bind to address before further initialization - let http_address: SocketAddr = args.http.parse()?; - info!("Starting http on {http_address}"); - let http_listener = TcpListener::bind(http_address).await?.into_std()?; - - let mgmt_address: SocketAddr = args.mgmt.parse()?; - info!("Starting mgmt on {mgmt_address}"); - let mgmt_listener = TcpListener::bind(mgmt_address).await?; - - let proxy_listener = if !args.is_auth_broker { - let proxy_address: SocketAddr = args.proxy.parse()?; - info!("Starting proxy on {proxy_address}"); - - Some(TcpListener::bind(proxy_address).await?) - } else { - None - }; - - // TODO: rename the argument to something like serverless. - // It now covers more than just websockets, it also covers SQL over HTTP. - let serverless_listener = if let Some(serverless_address) = args.wss { - let serverless_address: SocketAddr = serverless_address.parse()?; - info!("Starting wss on {serverless_address}"); - Some(TcpListener::bind(serverless_address).await?) - } else if args.is_auth_broker { - bail!("wss arg must be present for auth-broker") - } else { - None - }; - - let cancellation_token = CancellationToken::new(); - - let redis_rps_limit = Vec::leak(args.redis_rps_limit.clone()); - RateBucketInfo::validate(redis_rps_limit)?; - - let redis_kv_client = regional_redis_client - .as_ref() - .map(|redis_publisher| RedisKVClient::new(redis_publisher.clone(), redis_rps_limit)); - - // channel size should be higher than redis client limit to avoid blocking - let cancel_ch_size = args.cancellation_ch_size; - let (tx_cancel, rx_cancel) = tokio::sync::mpsc::channel(cancel_ch_size); - let cancellation_handler = Arc::new(CancellationHandler::new( - &config.connect_to_compute, - Some(tx_cancel), - )); - - // bit of a hack - find the min rps and max rps supported and turn it into - // leaky bucket config instead - let max = args - .endpoint_rps_limit - .iter() - .map(|x| x.rps()) - .max_by(f64::total_cmp) - .unwrap_or(EndpointRateLimiter::DEFAULT.max); - let rps = args - .endpoint_rps_limit - .iter() - .map(|x| x.rps()) - .min_by(f64::total_cmp) - .unwrap_or(EndpointRateLimiter::DEFAULT.rps); - let endpoint_rate_limiter = Arc::new(EndpointRateLimiter::new_with_shards( - LeakyBucketConfig { rps, max }, - 64, - )); - - // client facing tasks. these will exit on error or on cancellation - // cancellation returns Ok(()) - let mut client_tasks = JoinSet::new(); - match auth_backend { - Either::Left(auth_backend) => { - if let Some(proxy_listener) = proxy_listener { - client_tasks.spawn(proxy::proxy::task_main( - config, - auth_backend, - proxy_listener, - cancellation_token.clone(), - cancellation_handler.clone(), - endpoint_rate_limiter.clone(), - )); - } - - if let Some(serverless_listener) = serverless_listener { - client_tasks.spawn(serverless::task_main( - config, - auth_backend, - serverless_listener, - cancellation_token.clone(), - cancellation_handler.clone(), - endpoint_rate_limiter.clone(), - )); - } - } - Either::Right(auth_backend) => { - if let Some(proxy_listener) = proxy_listener { - client_tasks.spawn(proxy::console_redirect_proxy::task_main( - config, - auth_backend, - proxy_listener, - cancellation_token.clone(), - cancellation_handler.clone(), - )); - } - } - } - - client_tasks.spawn(proxy::context::parquet::worker( - cancellation_token.clone(), - args.parquet_upload, - )); - - // maintenance tasks. these never return unless there's an error - let mut maintenance_tasks = JoinSet::new(); - maintenance_tasks.spawn(proxy::signals::handle(cancellation_token.clone(), || {})); - maintenance_tasks.spawn(http::health_server::task_main( - http_listener, - AppMetrics { - jemalloc, - neon_metrics, - proxy: proxy::metrics::Metrics::get(), - }, - )); - maintenance_tasks.spawn(control_plane::mgmt::task_main(mgmt_listener)); - - if let Some(metrics_config) = &config.metric_collection { - // TODO: Add gc regardles of the metric collection being enabled. - maintenance_tasks.spawn(usage_metrics::task_main(metrics_config)); - } - - if let Either::Left(auth::Backend::ControlPlane(api, _)) = &auth_backend { - if let proxy::control_plane::client::ControlPlaneClient::ProxyV1(api) = &**api { - match (redis_notifications_client, regional_redis_client.clone()) { - (None, None) => {} - (client1, client2) => { - let cache = api.caches.project_info.clone(); - if let Some(client) = client1 { - maintenance_tasks.spawn(notifications::task_main( - client, - cache.clone(), - args.region.clone(), - )); - } - if let Some(client) = client2 { - maintenance_tasks.spawn(notifications::task_main( - client, - cache.clone(), - args.region.clone(), - )); - } - maintenance_tasks.spawn(async move { cache.clone().gc_worker().await }); - } - } - - if let Some(mut redis_kv_client) = redis_kv_client { - maintenance_tasks.spawn(async move { - redis_kv_client.try_connect().await?; - handle_cancel_messages(&mut redis_kv_client, rx_cancel).await - }); - } - - if let Some(regional_redis_client) = regional_redis_client { - let cache = api.caches.endpoints_cache.clone(); - let con = regional_redis_client; - let span = tracing::info_span!("endpoints_cache"); - maintenance_tasks.spawn( - async move { cache.do_read(con, cancellation_token.clone()).await } - .instrument(span), - ); - } - } - } - - let maintenance = loop { - // get one complete task - match futures::future::select( - pin!(maintenance_tasks.join_next()), - pin!(client_tasks.join_next()), - ) - .await - { - // exit immediately on maintenance task completion - Either::Left((Some(res), _)) => break proxy::error::flatten_err(res)?, - // exit with error immediately if all maintenance tasks have ceased (should be caught by branch above) - Either::Left((None, _)) => bail!("no maintenance tasks running. invalid state"), - // exit immediately on client task error - Either::Right((Some(res), _)) => proxy::error::flatten_err(res)?, - // exit if all our client tasks have shutdown gracefully - Either::Right((None, _)) => return Ok(()), - } - }; - - // maintenance tasks return Infallible success values, this is an impossible value - // so this match statically ensures that there are no possibilities for that value - match maintenance {} -} - -/// ProxyConfig is created at proxy startup, and lives forever. -fn build_config(args: &ProxyCliArgs) -> anyhow::Result<&'static ProxyConfig> { - let thread_pool = ThreadPool::new(args.scram_thread_pool_size); - Metrics::install(thread_pool.metrics.clone()); - - let tls_config = match (&args.tls_key, &args.tls_cert) { - (Some(key_path), Some(cert_path)) => Some(config::configure_tls( - key_path, - cert_path, - args.certs_dir.as_ref(), - args.allow_tls_keylogfile, - )?), - (None, None) => None, - _ => bail!("either both or neither tls-key and tls-cert must be specified"), - }; - - let backup_metric_collection_config = config::MetricBackupCollectionConfig { - interval: args.metric_backup_collection_interval, - remote_storage_config: args.metric_backup_collection_remote_storage.clone(), - chunk_size: args.metric_backup_collection_chunk_size, - }; - - let metric_collection = match ( - &args.metric_collection_endpoint, - &args.metric_collection_interval, - ) { - (Some(endpoint), Some(interval)) => Some(config::MetricCollectionConfig { - endpoint: endpoint.parse()?, - interval: humantime::parse_duration(interval)?, - backup_metric_collection_config, - }), - (None, None) => None, - _ => bail!( - "either both or neither metric-collection-endpoint \ - and metric-collection-interval must be specified" - ), - }; - - let config::ConcurrencyLockOptions { - shards, - limiter, - epoch, - timeout, - } = args.connect_compute_lock.parse()?; - info!( - ?limiter, - shards, - ?epoch, - "Using NodeLocks (connect_compute)" - ); - let connect_compute_locks = control_plane::locks::ApiLocks::new( - "connect_compute_lock", - limiter, - shards, - timeout, - epoch, - &Metrics::get().proxy.connect_compute_lock, - )?; - - let http_config = HttpConfig { - accept_websockets: !args.is_auth_broker, - pool_options: GlobalConnPoolOptions { - max_conns_per_endpoint: args.sql_over_http.sql_over_http_pool_max_conns_per_endpoint, - gc_epoch: args.sql_over_http.sql_over_http_pool_gc_epoch, - pool_shards: args.sql_over_http.sql_over_http_pool_shards, - idle_timeout: args.sql_over_http.sql_over_http_idle_timeout, - opt_in: args.sql_over_http.sql_over_http_pool_opt_in, - max_total_conns: args.sql_over_http.sql_over_http_pool_max_total_conns, - }, - cancel_set: CancelSet::new(args.sql_over_http.sql_over_http_cancel_set_shards), - client_conn_threshold: args.sql_over_http.sql_over_http_client_conn_threshold, - max_request_size_bytes: args.sql_over_http.sql_over_http_max_request_size_bytes, - max_response_size_bytes: args.sql_over_http.sql_over_http_max_response_size_bytes, - }; - let authentication_config = AuthenticationConfig { - jwks_cache: JwkCache::default(), - thread_pool, - scram_protocol_timeout: args.scram_protocol_timeout, - rate_limiter_enabled: args.auth_rate_limit_enabled, - rate_limiter: AuthRateLimiter::new(args.auth_rate_limit.clone()), - rate_limit_ip_subnet: args.auth_rate_limit_ip_subnet, - ip_allowlist_check_enabled: !args.is_private_access_proxy, - is_vpc_acccess_proxy: args.is_private_access_proxy, - is_auth_broker: args.is_auth_broker, - accept_jwts: args.is_auth_broker, - console_redirect_confirmation_timeout: args.webauth_confirmation_timeout, - }; - - let compute_config = ComputeConfig { - retry: config::RetryConfig::parse(&args.connect_to_compute_retry)?, - tls: Arc::new(compute_client_config_with_root_certs()?), - timeout: Duration::from_secs(2), - }; - - let config = ProxyConfig { - tls_config, - metric_collection, - http_config, - authentication_config, - proxy_protocol_v2: args.proxy_protocol_v2, - handshake_timeout: args.handshake_timeout, - region: args.region.clone(), - wake_compute_retry_config: config::RetryConfig::parse(&args.wake_compute_retry)?, - connect_compute_locks, - connect_to_compute: compute_config, - }; - - let config = Box::leak(Box::new(config)); - - tokio::spawn(config.connect_compute_locks.garbage_collect_worker()); - - Ok(config) -} - -/// auth::Backend is created at proxy startup, and lives forever. -fn build_auth_backend( - args: &ProxyCliArgs, -) -> anyhow::Result, &'static ConsoleRedirectBackend>> { - match &args.auth_backend { - AuthBackendType::ControlPlaneV1 => { - let wake_compute_cache_config: CacheOptions = args.wake_compute_cache.parse()?; - let project_info_cache_config: ProjectInfoCacheOptions = - args.project_info_cache.parse()?; - let endpoint_cache_config: config::EndpointCacheConfig = - args.endpoint_cache_config.parse()?; - - info!("Using NodeInfoCache (wake_compute) with options={wake_compute_cache_config:?}"); - info!( - "Using AllowedIpsCache (wake_compute) with options={project_info_cache_config:?}" - ); - info!("Using EndpointCacheConfig with options={endpoint_cache_config:?}"); - let caches = Box::leak(Box::new(control_plane::caches::ApiCaches::new( - wake_compute_cache_config, - project_info_cache_config, - endpoint_cache_config, - ))); - - let config::ConcurrencyLockOptions { - shards, - limiter, - epoch, - timeout, - } = args.wake_compute_lock.parse()?; - info!(?limiter, shards, ?epoch, "Using NodeLocks (wake_compute)"); - let locks = Box::leak(Box::new(control_plane::locks::ApiLocks::new( - "wake_compute_lock", - limiter, - shards, - timeout, - epoch, - &Metrics::get().wake_compute_lock, - )?)); - tokio::spawn(locks.garbage_collect_worker()); - - let url: proxy::url::ApiUrl = args.auth_endpoint.parse()?; - - let endpoint = http::Endpoint::new(url, http::new_client()); - - let mut wake_compute_rps_limit = args.wake_compute_limit.clone(); - RateBucketInfo::validate(&mut wake_compute_rps_limit)?; - let wake_compute_endpoint_rate_limiter = - Arc::new(WakeComputeRateLimiter::new(wake_compute_rps_limit)); - - let api = control_plane::client::cplane_proxy_v1::NeonControlPlaneClient::new( - endpoint, - args.control_plane_token.clone(), - caches, - locks, - wake_compute_endpoint_rate_limiter, - ); - - let api = control_plane::client::ControlPlaneClient::ProxyV1(api); - let auth_backend = auth::Backend::ControlPlane(MaybeOwned::Owned(api), ()); - let config = Box::leak(Box::new(auth_backend)); - - Ok(Either::Left(config)) - } - - #[cfg(feature = "testing")] - AuthBackendType::Postgres => { - let url = args.auth_endpoint.parse()?; - let api = control_plane::client::mock::MockControlPlane::new( - url, - !args.is_private_access_proxy, - ); - let api = control_plane::client::ControlPlaneClient::PostgresMock(api); - - let auth_backend = auth::Backend::ControlPlane(MaybeOwned::Owned(api), ()); - - let config = Box::leak(Box::new(auth_backend)); - - Ok(Either::Left(config)) - } - - AuthBackendType::ConsoleRedirect => { - let wake_compute_cache_config: CacheOptions = args.wake_compute_cache.parse()?; - let project_info_cache_config: ProjectInfoCacheOptions = - args.project_info_cache.parse()?; - let endpoint_cache_config: config::EndpointCacheConfig = - args.endpoint_cache_config.parse()?; - - info!("Using NodeInfoCache (wake_compute) with options={wake_compute_cache_config:?}"); - info!( - "Using AllowedIpsCache (wake_compute) with options={project_info_cache_config:?}" - ); - info!("Using EndpointCacheConfig with options={endpoint_cache_config:?}"); - let caches = Box::leak(Box::new(control_plane::caches::ApiCaches::new( - wake_compute_cache_config, - project_info_cache_config, - endpoint_cache_config, - ))); - - let config::ConcurrencyLockOptions { - shards, - limiter, - epoch, - timeout, - } = args.wake_compute_lock.parse()?; - info!(?limiter, shards, ?epoch, "Using NodeLocks (wake_compute)"); - let locks = Box::leak(Box::new(control_plane::locks::ApiLocks::new( - "wake_compute_lock", - limiter, - shards, - timeout, - epoch, - &Metrics::get().wake_compute_lock, - )?)); - - let url = args.uri.clone().parse()?; - let ep_url: proxy::url::ApiUrl = args.auth_endpoint.parse()?; - let endpoint = http::Endpoint::new(ep_url, http::new_client()); - let mut wake_compute_rps_limit = args.wake_compute_limit.clone(); - RateBucketInfo::validate(&mut wake_compute_rps_limit)?; - let wake_compute_endpoint_rate_limiter = - Arc::new(WakeComputeRateLimiter::new(wake_compute_rps_limit)); - - // Since we use only get_allowed_ips_and_secret() wake_compute_endpoint_rate_limiter - // and locks are not used in ConsoleRedirectBackend, - // but they are required by the NeonControlPlaneClient - let api = control_plane::client::cplane_proxy_v1::NeonControlPlaneClient::new( - endpoint, - args.control_plane_token.clone(), - caches, - locks, - wake_compute_endpoint_rate_limiter, - ); - - let backend = ConsoleRedirectBackend::new(url, api); - let config = Box::leak(Box::new(backend)); - - Ok(Either::Right(config)) - } - } -} - -#[cfg(test)] -mod tests { - use std::time::Duration; - - use clap::Parser; - use proxy::rate_limiter::RateBucketInfo; - - #[test] - fn parse_endpoint_rps_limit() { - let config = super::ProxyCliArgs::parse_from([ - "proxy", - "--endpoint-rps-limit", - "100@1s", - "--endpoint-rps-limit", - "20@30s", - ]); - - assert_eq!( - config.endpoint_rps_limit, - vec![ - RateBucketInfo::new(100, Duration::from_secs(1)), - RateBucketInfo::new(20, Duration::from_secs(30)), - ] - ); - } + proxy::binary::proxy::run().await } diff --git a/proxy/src/binary/local_proxy.rs b/proxy/src/binary/local_proxy.rs new file mode 100644 index 0000000000..e0d8515375 --- /dev/null +++ b/proxy/src/binary/local_proxy.rs @@ -0,0 +1,410 @@ +use std::net::SocketAddr; +use std::pin::pin; +use std::str::FromStr; +use std::sync::Arc; +use std::time::Duration; + +use crate::auth::backend::jwt::JwkCache; +use crate::auth::backend::local::{LocalBackend, JWKS_ROLE_MAP}; +use crate::auth::{self}; +use crate::cancellation::CancellationHandler; +use crate::config::{ + self, AuthenticationConfig, ComputeConfig, HttpConfig, ProxyConfig, RetryConfig, +}; +use crate::control_plane::locks::ApiLocks; +use crate::control_plane::messages::{EndpointJwksResponse, JwksSettings}; +use crate::http::health_server::AppMetrics; +use crate::intern::RoleNameInt; +use crate::metrics::{Metrics, ThreadPoolMetrics}; +use crate::rate_limiter::{ + BucketRateLimiter, EndpointRateLimiter, LeakyBucketConfig, RateBucketInfo, +}; +use crate::scram::threadpool::ThreadPool; +use crate::serverless::cancel_set::CancelSet; +use crate::serverless::{self, GlobalConnPoolOptions}; +use crate::tls::client_config::compute_client_config_with_root_certs; +use crate::types::RoleName; +use crate::url::ApiUrl; +use anyhow::{bail, ensure, Context}; +use camino::{Utf8Path, Utf8PathBuf}; +use compute_api::spec::LocalProxySpec; +use futures::future::Either; + +project_git_version!(GIT_VERSION); +project_build_tag!(BUILD_TAG); + +use clap::Parser; +use thiserror::Error; +use tokio::net::TcpListener; +use tokio::sync::Notify; +use tokio::task::JoinSet; +use tokio_util::sync::CancellationToken; +use tracing::{debug, error, info, warn}; +use utils::sentry_init::init_sentry; +use utils::{pid_file, project_build_tag, project_git_version}; + +/// Neon proxy/router +#[derive(Parser)] +#[command(version = GIT_VERSION, about)] +struct LocalProxyCliArgs { + /// listen for incoming metrics connections on ip:port + #[clap(long, default_value = "127.0.0.1:7001")] + metrics: String, + /// listen for incoming http connections on ip:port + #[clap(long)] + http: String, + /// timeout for the TLS handshake + #[clap(long, default_value = "15s", value_parser = humantime::parse_duration)] + handshake_timeout: tokio::time::Duration, + /// lock for `connect_compute` api method. example: "shards=32,permits=4,epoch=10m,timeout=1s". (use `permits=0` to disable). + #[clap(long, default_value = config::ConcurrencyLockOptions::DEFAULT_OPTIONS_CONNECT_COMPUTE_LOCK)] + connect_compute_lock: String, + #[clap(flatten)] + sql_over_http: SqlOverHttpArgs, + /// User rate limiter max number of requests per second. + /// + /// Provided in the form `@`. + /// Can be given multiple times for different bucket sizes. + #[clap(long, default_values_t = RateBucketInfo::DEFAULT_ENDPOINT_SET)] + user_rps_limit: Vec, + /// Whether the auth rate limiter actually takes effect (for testing) + #[clap(long, default_value_t = false, value_parser = clap::builder::BoolishValueParser::new(), action = clap::ArgAction::Set)] + auth_rate_limit_enabled: bool, + /// Authentication rate limiter max number of hashes per second. + #[clap(long, default_values_t = RateBucketInfo::DEFAULT_AUTH_SET)] + auth_rate_limit: Vec, + /// The IP subnet to use when considering whether two IP addresses are considered the same. + #[clap(long, default_value_t = 64)] + auth_rate_limit_ip_subnet: u8, + /// Whether to retry the connection to the compute node + #[clap(long, default_value = config::RetryConfig::CONNECT_TO_COMPUTE_DEFAULT_VALUES)] + connect_to_compute_retry: String, + /// Address of the postgres server + #[clap(long, default_value = "127.0.0.1:5432")] + postgres: SocketAddr, + /// Address of the internal compute-ctl api service + #[clap(long, default_value = "http://127.0.0.1:3081/")] + compute_ctl: ApiUrl, + /// Path of the local proxy config file + #[clap(long, default_value = "./local_proxy.json")] + config_path: Utf8PathBuf, + /// Path of the local proxy PID file + #[clap(long, default_value = "./local_proxy.pid")] + pid_path: Utf8PathBuf, +} + +#[derive(clap::Args, Clone, Copy, Debug)] +struct SqlOverHttpArgs { + /// How many connections to pool for each endpoint. Excess connections are discarded + #[clap(long, default_value_t = 200)] + sql_over_http_pool_max_total_conns: usize, + + /// How long pooled connections should remain idle for before closing + #[clap(long, default_value = "5m", value_parser = humantime::parse_duration)] + sql_over_http_idle_timeout: tokio::time::Duration, + + #[clap(long, default_value_t = 100)] + sql_over_http_client_conn_threshold: u64, + + #[clap(long, default_value_t = 16)] + sql_over_http_cancel_set_shards: usize, + + #[clap(long, default_value_t = 10 * 1024 * 1024)] // 10 MiB + sql_over_http_max_request_size_bytes: usize, + + #[clap(long, default_value_t = 10 * 1024 * 1024)] // 10 MiB + sql_over_http_max_response_size_bytes: usize, +} + +pub async fn run() -> anyhow::Result<()> { + let _logging_guard = crate::logging::init_local_proxy()?; + let _panic_hook_guard = utils::logging::replace_panic_hook_with_tracing_panic_hook(); + let _sentry_guard = init_sentry(Some(GIT_VERSION.into()), &[]); + + Metrics::install(Arc::new(ThreadPoolMetrics::new(0))); + + // TODO: refactor these to use labels + debug!("Version: {GIT_VERSION}"); + debug!("Build_tag: {BUILD_TAG}"); + let neon_metrics = ::metrics::NeonMetrics::new(::metrics::BuildInfo { + revision: GIT_VERSION, + build_tag: BUILD_TAG, + }); + + let jemalloc = match crate::jemalloc::MetricRecorder::new() { + Ok(t) => Some(t), + Err(e) => { + tracing::error!(error = ?e, "could not start jemalloc metrics loop"); + None + } + }; + + let args = LocalProxyCliArgs::parse(); + let config = build_config(&args)?; + let auth_backend = build_auth_backend(&args); + + // before we bind to any ports, write the process ID to a file + // so that compute-ctl can find our process later + // in order to trigger the appropriate SIGHUP on config change. + // + // This also claims a "lock" that makes sure only one instance + // of local_proxy runs at a time. + let _process_guard = loop { + match pid_file::claim_for_current_process(&args.pid_path) { + Ok(guard) => break guard, + Err(e) => { + // compute-ctl might have tried to read the pid-file to let us + // know about some config change. We should try again. + error!(path=?args.pid_path, "could not claim PID file guard: {e:?}"); + tokio::time::sleep(Duration::from_secs(1)).await; + } + } + }; + + let metrics_listener = TcpListener::bind(args.metrics).await?.into_std()?; + let http_listener = TcpListener::bind(args.http).await?; + let shutdown = CancellationToken::new(); + + // todo: should scale with CU + let endpoint_rate_limiter = Arc::new(EndpointRateLimiter::new_with_shards( + LeakyBucketConfig { + rps: 10.0, + max: 100.0, + }, + 16, + )); + + let mut maintenance_tasks = JoinSet::new(); + + let refresh_config_notify = Arc::new(Notify::new()); + maintenance_tasks.spawn(crate::signals::handle(shutdown.clone(), { + let refresh_config_notify = Arc::clone(&refresh_config_notify); + move || { + refresh_config_notify.notify_one(); + } + })); + + // trigger the first config load **after** setting up the signal hook + // to avoid the race condition where: + // 1. No config file registered when local_proxy starts up + // 2. The config file is written but the signal hook is not yet received + // 3. local_proxy completes startup but has no config loaded, despite there being a registerd config. + refresh_config_notify.notify_one(); + tokio::spawn(refresh_config_loop(args.config_path, refresh_config_notify)); + + maintenance_tasks.spawn(crate::http::health_server::task_main( + metrics_listener, + AppMetrics { + jemalloc, + neon_metrics, + proxy: crate::metrics::Metrics::get(), + }, + )); + + let task = serverless::task_main( + config, + auth_backend, + http_listener, + shutdown.clone(), + Arc::new(CancellationHandler::new(&config.connect_to_compute, None)), + endpoint_rate_limiter, + ); + + match futures::future::select(pin!(maintenance_tasks.join_next()), pin!(task)).await { + // exit immediately on maintenance task completion + Either::Left((Some(res), _)) => match crate::error::flatten_err(res)? {}, + // exit with error immediately if all maintenance tasks have ceased (should be caught by branch above) + Either::Left((None, _)) => bail!("no maintenance tasks running. invalid state"), + // exit immediately on client task error + Either::Right((res, _)) => res?, + } + + Ok(()) +} + +/// ProxyConfig is created at proxy startup, and lives forever. +fn build_config(args: &LocalProxyCliArgs) -> anyhow::Result<&'static ProxyConfig> { + let config::ConcurrencyLockOptions { + shards, + limiter, + epoch, + timeout, + } = args.connect_compute_lock.parse()?; + info!( + ?limiter, + shards, + ?epoch, + "Using NodeLocks (connect_compute)" + ); + let connect_compute_locks = ApiLocks::new( + "connect_compute_lock", + limiter, + shards, + timeout, + epoch, + &Metrics::get().proxy.connect_compute_lock, + ); + + let http_config = HttpConfig { + accept_websockets: false, + pool_options: GlobalConnPoolOptions { + gc_epoch: Duration::from_secs(60), + pool_shards: 2, + idle_timeout: args.sql_over_http.sql_over_http_idle_timeout, + opt_in: false, + + max_conns_per_endpoint: args.sql_over_http.sql_over_http_pool_max_total_conns, + max_total_conns: args.sql_over_http.sql_over_http_pool_max_total_conns, + }, + cancel_set: CancelSet::new(args.sql_over_http.sql_over_http_cancel_set_shards), + client_conn_threshold: args.sql_over_http.sql_over_http_client_conn_threshold, + max_request_size_bytes: args.sql_over_http.sql_over_http_max_request_size_bytes, + max_response_size_bytes: args.sql_over_http.sql_over_http_max_response_size_bytes, + }; + + let compute_config = ComputeConfig { + retry: RetryConfig::parse(RetryConfig::CONNECT_TO_COMPUTE_DEFAULT_VALUES)?, + tls: Arc::new(compute_client_config_with_root_certs()?), + timeout: Duration::from_secs(2), + }; + + Ok(Box::leak(Box::new(ProxyConfig { + tls_config: None, + metric_collection: None, + http_config, + authentication_config: AuthenticationConfig { + jwks_cache: JwkCache::default(), + thread_pool: ThreadPool::new(0), + scram_protocol_timeout: Duration::from_secs(10), + rate_limiter_enabled: false, + rate_limiter: BucketRateLimiter::new(vec![]), + rate_limit_ip_subnet: 64, + ip_allowlist_check_enabled: true, + is_vpc_acccess_proxy: false, + is_auth_broker: false, + accept_jwts: true, + console_redirect_confirmation_timeout: Duration::ZERO, + }, + proxy_protocol_v2: config::ProxyProtocolV2::Rejected, + handshake_timeout: Duration::from_secs(10), + region: "local".into(), + wake_compute_retry_config: RetryConfig::parse(RetryConfig::WAKE_COMPUTE_DEFAULT_VALUES)?, + connect_compute_locks, + connect_to_compute: compute_config, + }))) +} + +/// auth::Backend is created at proxy startup, and lives forever. +fn build_auth_backend(args: &LocalProxyCliArgs) -> &'static auth::Backend<'static, ()> { + let auth_backend = crate::auth::Backend::Local(crate::auth::backend::MaybeOwned::Owned( + LocalBackend::new(args.postgres, args.compute_ctl.clone()), + )); + + Box::leak(Box::new(auth_backend)) +} + +#[derive(Error, Debug)] +enum RefreshConfigError { + #[error(transparent)] + Read(#[from] std::io::Error), + #[error(transparent)] + Parse(#[from] serde_json::Error), + #[error(transparent)] + Validate(anyhow::Error), +} + +async fn refresh_config_loop(path: Utf8PathBuf, rx: Arc) { + let mut init = true; + loop { + rx.notified().await; + + match refresh_config_inner(&path).await { + Ok(()) => {} + // don't log for file not found errors if this is the first time we are checking + // for computes that don't use local_proxy, this is not an error. + Err(RefreshConfigError::Read(e)) + if init && e.kind() == std::io::ErrorKind::NotFound => + { + debug!(error=?e, ?path, "could not read config file"); + } + Err(e) => { + error!(error=?e, ?path, "could not read config file"); + } + } + + init = false; + } +} + +async fn refresh_config_inner(path: &Utf8Path) -> Result<(), RefreshConfigError> { + let bytes = tokio::fs::read(&path).await?; + let data: LocalProxySpec = serde_json::from_slice(&bytes)?; + + let mut jwks_set = vec![]; + + fn parse_jwks_settings(jwks: compute_api::spec::JwksSettings) -> anyhow::Result { + let mut jwks_url = url::Url::from_str(&jwks.jwks_url).context("parsing JWKS url")?; + + ensure!( + jwks_url.has_authority() + && (jwks_url.scheme() == "http" || jwks_url.scheme() == "https"), + "Invalid JWKS url. Must be HTTP", + ); + + ensure!( + jwks_url.host().is_some_and(|h| h != url::Host::Domain("")), + "Invalid JWKS url. No domain listed", + ); + + // clear username, password and ports + jwks_url + .set_username("") + .expect("url can be a base and has a valid host and is not a file. should not error"); + jwks_url + .set_password(None) + .expect("url can be a base and has a valid host and is not a file. should not error"); + // local testing is hard if we need to have a specific restricted port + if cfg!(not(feature = "testing")) { + jwks_url.set_port(None).expect( + "url can be a base and has a valid host and is not a file. should not error", + ); + } + + // clear query params + jwks_url.set_fragment(None); + jwks_url.query_pairs_mut().clear().finish(); + + if jwks_url.scheme() != "https" { + // local testing is hard if we need to set up https support. + if cfg!(not(feature = "testing")) { + jwks_url + .set_scheme("https") + .expect("should not error to set the scheme to https if it was http"); + } else { + warn!(scheme = jwks_url.scheme(), "JWKS url is not HTTPS"); + } + } + + Ok(JwksSettings { + id: jwks.id, + jwks_url, + _provider_name: jwks.provider_name, + jwt_audience: jwks.jwt_audience, + role_names: jwks + .role_names + .into_iter() + .map(RoleName::from) + .map(|s| RoleNameInt::from(&s)) + .collect(), + }) + } + + for jwks in data.jwks.into_iter().flatten() { + jwks_set.push(parse_jwks_settings(jwks).map_err(RefreshConfigError::Validate)?); + } + + info!("successfully loaded new config"); + JWKS_ROLE_MAP.store(Some(Arc::new(EndpointJwksResponse { jwks: jwks_set }))); + + Ok(()) +} diff --git a/proxy/src/binary/mod.rs b/proxy/src/binary/mod.rs new file mode 100644 index 0000000000..dc07d3e675 --- /dev/null +++ b/proxy/src/binary/mod.rs @@ -0,0 +1,7 @@ +//! All binaries have the body of their main() defined here, so that the code +//! is also covered by code style configs in lib.rs and the unused-code check is +//! more effective when practically all modules are private to the lib. + +pub mod local_proxy; +pub mod pg_sni_router; +pub mod proxy; diff --git a/proxy/src/binary/pg_sni_router.rs b/proxy/src/binary/pg_sni_router.rs new file mode 100644 index 0000000000..235e9674c6 --- /dev/null +++ b/proxy/src/binary/pg_sni_router.rs @@ -0,0 +1,304 @@ +/// A stand-alone program that routes connections, e.g. from +/// `aaa--bbb--1234.external.domain` to `aaa.bbb.internal.domain:1234`. +/// +/// This allows connecting to pods/services running in the same Kubernetes cluster from +/// the outside. Similar to an ingress controller for HTTPS. +use std::{net::SocketAddr, sync::Arc}; + +use crate::context::RequestContext; +use crate::metrics::{Metrics, ThreadPoolMetrics}; +use crate::protocol2::ConnectionInfo; +use crate::proxy::{copy_bidirectional_client_compute, run_until_cancelled, ErrorSource}; +use crate::stream::{PqStream, Stream}; +use crate::tls::TlsServerEndPoint; +use anyhow::{anyhow, bail, ensure, Context}; +use clap::Arg; +use futures::future::Either; +use futures::TryFutureExt; +use itertools::Itertools; +use rustls::crypto::ring; +use rustls::pki_types::PrivateKeyDer; +use tokio::io::{AsyncRead, AsyncWrite}; +use tokio::net::TcpListener; +use tokio_util::sync::CancellationToken; +use tracing::{error, info, Instrument}; +use utils::project_git_version; +use utils::sentry_init::init_sentry; + +project_git_version!(GIT_VERSION); + +fn cli() -> clap::Command { + clap::Command::new("Neon proxy/router") + .version(GIT_VERSION) + .arg( + Arg::new("listen") + .short('l') + .long("listen") + .help("listen for incoming client connections on ip:port") + .default_value("127.0.0.1:4432"), + ) + .arg( + Arg::new("tls-key") + .short('k') + .long("tls-key") + .help("path to TLS key for client postgres connections") + .required(true), + ) + .arg( + Arg::new("tls-cert") + .short('c') + .long("tls-cert") + .help("path to TLS cert for client postgres connections") + .required(true), + ) + .arg( + Arg::new("dest") + .short('d') + .long("destination") + .help("append this domain zone to the SNI hostname to get the destination address") + .required(true), + ) +} + +pub async fn run() -> anyhow::Result<()> { + let _logging_guard = crate::logging::init().await?; + let _panic_hook_guard = utils::logging::replace_panic_hook_with_tracing_panic_hook(); + let _sentry_guard = init_sentry(Some(GIT_VERSION.into()), &[]); + + Metrics::install(Arc::new(ThreadPoolMetrics::new(0))); + + let args = cli().get_matches(); + let destination: String = args + .get_one::("dest") + .expect("string argument defined") + .parse()?; + + // Configure TLS + let (tls_config, tls_server_end_point): (Arc, TlsServerEndPoint) = match ( + args.get_one::("tls-key"), + args.get_one::("tls-cert"), + ) { + (Some(key_path), Some(cert_path)) => { + let key = { + let key_bytes = std::fs::read(key_path).context("TLS key file")?; + + let mut keys = + rustls_pemfile::pkcs8_private_keys(&mut &key_bytes[..]).collect_vec(); + + ensure!(keys.len() == 1, "keys.len() = {} (should be 1)", keys.len()); + PrivateKeyDer::Pkcs8( + keys.pop() + .expect("keys should not be empty") + .context(format!("Failed to read TLS keys at '{key_path}'"))?, + ) + }; + + let cert_chain_bytes = std::fs::read(cert_path) + .context(format!("Failed to read TLS cert file at '{cert_path}.'"))?; + + let cert_chain: Vec<_> = { + rustls_pemfile::certs(&mut &cert_chain_bytes[..]) + .try_collect() + .with_context(|| { + format!("Failed to read TLS certificate chain from bytes from file at '{cert_path}'.") + })? + }; + + // needed for channel bindings + let first_cert = cert_chain.first().context("missing certificate")?; + let tls_server_end_point = TlsServerEndPoint::new(first_cert)?; + + let tls_config = + rustls::ServerConfig::builder_with_provider(Arc::new(ring::default_provider())) + .with_protocol_versions(&[&rustls::version::TLS13, &rustls::version::TLS12]) + .context("ring should support TLS1.2 and TLS1.3")? + .with_no_client_auth() + .with_single_cert(cert_chain, key)? + .into(); + + (tls_config, tls_server_end_point) + } + _ => bail!("tls-key and tls-cert must be specified"), + }; + + // Start listening for incoming client connections + let proxy_address: SocketAddr = args + .get_one::("listen") + .expect("string argument defined") + .parse()?; + info!("Starting sni router on {proxy_address}"); + let proxy_listener = TcpListener::bind(proxy_address).await?; + + let cancellation_token = CancellationToken::new(); + + let main = tokio::spawn(task_main( + Arc::new(destination), + tls_config, + tls_server_end_point, + proxy_listener, + cancellation_token.clone(), + )); + let signals_task = tokio::spawn(crate::signals::handle(cancellation_token, || {})); + + // the signal task cant ever succeed. + // the main task can error, or can succeed on cancellation. + // we want to immediately exit on either of these cases + let signal = match futures::future::select(signals_task, main).await { + Either::Left((res, _)) => crate::error::flatten_err(res)?, + Either::Right((res, _)) => return crate::error::flatten_err(res), + }; + + // maintenance tasks return `Infallible` success values, this is an impossible value + // so this match statically ensures that there are no possibilities for that value + match signal {} +} + +async fn task_main( + dest_suffix: Arc, + tls_config: Arc, + tls_server_end_point: TlsServerEndPoint, + listener: tokio::net::TcpListener, + cancellation_token: CancellationToken, +) -> anyhow::Result<()> { + // When set for the server socket, the keepalive setting + // will be inherited by all accepted client sockets. + socket2::SockRef::from(&listener).set_keepalive(true)?; + + let connections = tokio_util::task::task_tracker::TaskTracker::new(); + + while let Some(accept_result) = + run_until_cancelled(listener.accept(), &cancellation_token).await + { + let (socket, peer_addr) = accept_result?; + + let session_id = uuid::Uuid::new_v4(); + let tls_config = Arc::clone(&tls_config); + let dest_suffix = Arc::clone(&dest_suffix); + + connections.spawn( + async move { + socket + .set_nodelay(true) + .context("failed to set socket option")?; + + info!(%peer_addr, "serving"); + let ctx = RequestContext::new( + session_id, + ConnectionInfo { + addr: peer_addr, + extra: None, + }, + crate::metrics::Protocol::SniRouter, + "sni", + ); + handle_client(ctx, dest_suffix, tls_config, tls_server_end_point, socket).await + } + .unwrap_or_else(|e| { + // Acknowledge that the task has finished with an error. + error!("per-client task finished with an error: {e:#}"); + }) + .instrument(tracing::info_span!("handle_client", ?session_id)), + ); + } + + connections.close(); + drop(listener); + + connections.wait().await; + + info!("all client connections have finished"); + Ok(()) +} + +const ERR_INSECURE_CONNECTION: &str = "connection is insecure (try using `sslmode=require`)"; + +async fn ssl_handshake( + ctx: &RequestContext, + raw_stream: S, + tls_config: Arc, + tls_server_end_point: TlsServerEndPoint, +) -> anyhow::Result> { + let mut stream = PqStream::new(Stream::from_raw(raw_stream)); + + let msg = stream.read_startup_packet().await?; + use pq_proto::FeStartupPacket::SslRequest; + + match msg { + SslRequest { direct: false } => { + stream + .write_message(&pq_proto::BeMessage::EncryptionResponse(true)) + .await?; + + // Upgrade raw stream into a secure TLS-backed stream. + // NOTE: We've consumed `tls`; this fact will be used later. + + let (raw, read_buf) = stream.into_inner(); + // TODO: Normally, client doesn't send any data before + // server says TLS handshake is ok and read_buf is empty. + // However, you could imagine pipelining of postgres + // SSLRequest + TLS ClientHello in one hunk similar to + // pipelining in our node js driver. We should probably + // support that by chaining read_buf with the stream. + if !read_buf.is_empty() { + bail!("data is sent before server replied with EncryptionResponse"); + } + + Ok(Stream::Tls { + tls: Box::new( + raw.upgrade(tls_config, !ctx.has_private_peer_addr()) + .await?, + ), + tls_server_end_point, + }) + } + unexpected => { + info!( + ?unexpected, + "unexpected startup packet, rejecting connection" + ); + stream + .throw_error_str(ERR_INSECURE_CONNECTION, crate::error::ErrorKind::User) + .await? + } + } +} + +async fn handle_client( + ctx: RequestContext, + dest_suffix: Arc, + tls_config: Arc, + tls_server_end_point: TlsServerEndPoint, + stream: impl AsyncRead + AsyncWrite + Unpin, +) -> anyhow::Result<()> { + let mut tls_stream = ssl_handshake(&ctx, stream, tls_config, tls_server_end_point).await?; + + // Cut off first part of the SNI domain + // We receive required destination details in the format of + // `{k8s_service_name}--{k8s_namespace}--{port}.non-sni-domain` + let sni = tls_stream.sni_hostname().ok_or(anyhow!("SNI missing"))?; + let dest: Vec<&str> = sni + .split_once('.') + .context("invalid SNI")? + .0 + .splitn(3, "--") + .collect(); + let port = dest[2].parse::().context("invalid port")?; + let destination = format!("{}.{}.{}:{}", dest[0], dest[1], dest_suffix, port); + + info!("destination: {}", destination); + + let mut client = tokio::net::TcpStream::connect(destination).await?; + + // doesn't yet matter as pg-sni-router doesn't report analytics logs + ctx.set_success(); + ctx.log_connect(); + + // Starting from here we only proxy the client's traffic. + info!("performing the proxy pass..."); + + match copy_bidirectional_client_compute(&mut tls_stream, &mut client).await { + Ok(_) => Ok(()), + Err(ErrorSource::Client(err)) => Err(err).context("client"), + Err(ErrorSource::Compute(err)) => Err(err).context("compute"), + } +} diff --git a/proxy/src/binary/proxy.rs b/proxy/src/binary/proxy.rs new file mode 100644 index 0000000000..e38c49ca10 --- /dev/null +++ b/proxy/src/binary/proxy.rs @@ -0,0 +1,827 @@ +use std::net::SocketAddr; +use std::pin::pin; +use std::sync::Arc; +use std::time::Duration; + +use crate::auth::backend::jwt::JwkCache; +use crate::auth::backend::{AuthRateLimiter, ConsoleRedirectBackend, MaybeOwned}; +use crate::cancellation::{handle_cancel_messages, CancellationHandler}; +use crate::config::{ + self, remote_storage_from_toml, AuthenticationConfig, CacheOptions, ComputeConfig, HttpConfig, + ProjectInfoCacheOptions, ProxyConfig, ProxyProtocolV2, +}; +use crate::context::parquet::ParquetUploadArgs; +use crate::http::health_server::AppMetrics; +use crate::metrics::Metrics; +use crate::rate_limiter::{ + EndpointRateLimiter, LeakyBucketConfig, RateBucketInfo, WakeComputeRateLimiter, +}; +use crate::redis::connection_with_credentials_provider::ConnectionWithCredentialsProvider; +use crate::redis::kv_ops::RedisKVClient; +use crate::redis::{elasticache, notifications}; +use crate::scram::threadpool::ThreadPool; +use crate::serverless::cancel_set::CancelSet; +use crate::serverless::GlobalConnPoolOptions; +use crate::tls::client_config::compute_client_config_with_root_certs; +use crate::{auth, control_plane, http, serverless, usage_metrics}; +use anyhow::bail; +use futures::future::Either; +use remote_storage::RemoteStorageConfig; +use tokio::net::TcpListener; +use tokio::task::JoinSet; +use tokio_util::sync::CancellationToken; +use tracing::{info, warn, Instrument}; +use utils::sentry_init::init_sentry; +use utils::{project_build_tag, project_git_version}; + +project_git_version!(GIT_VERSION); +project_build_tag!(BUILD_TAG); + +use clap::{Parser, ValueEnum}; + +#[derive(Clone, Debug, ValueEnum)] +enum AuthBackendType { + #[value(name("cplane-v1"), alias("control-plane"))] + ControlPlaneV1, + + #[value(name("link"), alias("control-redirect"))] + ConsoleRedirect, + + #[cfg(any(test, feature = "testing"))] + Postgres, +} + +/// Neon proxy/router +#[derive(Parser)] +#[command(version = GIT_VERSION, about)] +struct ProxyCliArgs { + /// Name of the region this proxy is deployed in + #[clap(long, default_value_t = String::new())] + region: String, + /// listen for incoming client connections on ip:port + #[clap(short, long, default_value = "127.0.0.1:4432")] + proxy: String, + #[clap(value_enum, long, default_value_t = AuthBackendType::ConsoleRedirect)] + auth_backend: AuthBackendType, + /// listen for management callback connection on ip:port + #[clap(short, long, default_value = "127.0.0.1:7000")] + mgmt: String, + /// listen for incoming http connections (metrics, etc) on ip:port + #[clap(long, default_value = "127.0.0.1:7001")] + http: String, + /// listen for incoming wss connections on ip:port + #[clap(long)] + wss: Option, + /// redirect unauthenticated users to the given uri in case of console redirect auth + #[clap(short, long, default_value = "http://localhost:3000/psql_session/")] + uri: String, + /// cloud API endpoint for authenticating users + #[clap( + short, + long, + default_value = "http://localhost:3000/authenticate_proxy_request/" + )] + auth_endpoint: String, + /// JWT used to connect to control plane. + #[clap( + long, + value_name = "JWT", + default_value = "", + env = "NEON_PROXY_TO_CONTROLPLANE_TOKEN" + )] + control_plane_token: Arc, + /// if this is not local proxy, this toggles whether we accept jwt or passwords for http + #[clap(long, default_value_t = false, value_parser = clap::builder::BoolishValueParser::new(), action = clap::ArgAction::Set)] + is_auth_broker: bool, + /// path to TLS key for client postgres connections + /// + /// tls-key and tls-cert are for backwards compatibility, we can put all certs in one dir + #[clap(short = 'k', long, alias = "ssl-key")] + tls_key: Option, + /// path to TLS cert for client postgres connections + /// + /// tls-key and tls-cert are for backwards compatibility, we can put all certs in one dir + #[clap(short = 'c', long, alias = "ssl-cert")] + tls_cert: Option, + /// Allow writing TLS session keys to the given file pointed to by the environment variable `SSLKEYLOGFILE`. + #[clap(long, alias = "allow-ssl-keylogfile")] + allow_tls_keylogfile: bool, + /// path to directory with TLS certificates for client postgres connections + #[clap(long)] + certs_dir: Option, + /// timeout for the TLS handshake + #[clap(long, default_value = "15s", value_parser = humantime::parse_duration)] + handshake_timeout: tokio::time::Duration, + /// http endpoint to receive periodic metric updates + #[clap(long)] + metric_collection_endpoint: Option, + /// how often metrics should be sent to a collection endpoint + #[clap(long)] + metric_collection_interval: Option, + /// cache for `wake_compute` api method (use `size=0` to disable) + #[clap(long, default_value = config::CacheOptions::CACHE_DEFAULT_OPTIONS)] + wake_compute_cache: String, + /// lock for `wake_compute` api method. example: "shards=32,permits=4,epoch=10m,timeout=1s". (use `permits=0` to disable). + #[clap(long, default_value = config::ConcurrencyLockOptions::DEFAULT_OPTIONS_WAKE_COMPUTE_LOCK)] + wake_compute_lock: String, + /// lock for `connect_compute` api method. example: "shards=32,permits=4,epoch=10m,timeout=1s". (use `permits=0` to disable). + #[clap(long, default_value = config::ConcurrencyLockOptions::DEFAULT_OPTIONS_CONNECT_COMPUTE_LOCK)] + connect_compute_lock: String, + #[clap(flatten)] + sql_over_http: SqlOverHttpArgs, + /// timeout for scram authentication protocol + #[clap(long, default_value = "15s", value_parser = humantime::parse_duration)] + scram_protocol_timeout: tokio::time::Duration, + /// size of the threadpool for password hashing + #[clap(long, default_value_t = 4)] + scram_thread_pool_size: u8, + /// Endpoint rate limiter max number of requests per second. + /// + /// Provided in the form `@`. + /// Can be given multiple times for different bucket sizes. + #[clap(long, default_values_t = RateBucketInfo::DEFAULT_ENDPOINT_SET)] + endpoint_rps_limit: Vec, + /// Wake compute rate limiter max number of requests per second. + #[clap(long, default_values_t = RateBucketInfo::DEFAULT_SET)] + wake_compute_limit: Vec, + /// Whether the auth rate limiter actually takes effect (for testing) + #[clap(long, default_value_t = false, value_parser = clap::builder::BoolishValueParser::new(), action = clap::ArgAction::Set)] + auth_rate_limit_enabled: bool, + /// Authentication rate limiter max number of hashes per second. + #[clap(long, default_values_t = RateBucketInfo::DEFAULT_AUTH_SET)] + auth_rate_limit: Vec, + /// The IP subnet to use when considering whether two IP addresses are considered the same. + #[clap(long, default_value_t = 64)] + auth_rate_limit_ip_subnet: u8, + /// Redis rate limiter max number of requests per second. + #[clap(long, default_values_t = RateBucketInfo::DEFAULT_REDIS_SET)] + redis_rps_limit: Vec, + /// Cancellation channel size (max queue size for redis kv client) + #[clap(long, default_value = "1024")] + cancellation_ch_size: usize, + /// cache for `allowed_ips` (use `size=0` to disable) + #[clap(long, default_value = config::CacheOptions::CACHE_DEFAULT_OPTIONS)] + allowed_ips_cache: String, + /// cache for `role_secret` (use `size=0` to disable) + #[clap(long, default_value = config::CacheOptions::CACHE_DEFAULT_OPTIONS)] + role_secret_cache: String, + /// redis url for notifications (if empty, redis_host:port will be used for both notifications and streaming connections) + #[clap(long)] + redis_notifications: Option, + /// what from the available authentications type to use for the regional redis we have. Supported are "irsa" and "plain". + #[clap(long, default_value = "irsa")] + redis_auth_type: String, + /// redis host for streaming connections (might be different from the notifications host) + #[clap(long)] + redis_host: Option, + /// redis port for streaming connections (might be different from the notifications host) + #[clap(long)] + redis_port: Option, + /// redis cluster name, used in aws elasticache + #[clap(long)] + redis_cluster_name: Option, + /// redis user_id, used in aws elasticache + #[clap(long)] + redis_user_id: Option, + /// aws region to retrieve credentials + #[clap(long, default_value_t = String::new())] + aws_region: String, + /// cache for `project_info` (use `size=0` to disable) + #[clap(long, default_value = config::ProjectInfoCacheOptions::CACHE_DEFAULT_OPTIONS)] + project_info_cache: String, + /// cache for all valid endpoints + #[clap(long, default_value = config::EndpointCacheConfig::CACHE_DEFAULT_OPTIONS)] + endpoint_cache_config: String, + #[clap(flatten)] + parquet_upload: ParquetUploadArgs, + + /// interval for backup metric collection + #[clap(long, default_value = "10m", value_parser = humantime::parse_duration)] + metric_backup_collection_interval: std::time::Duration, + /// remote storage configuration for backup metric collection + /// Encoded as toml (same format as pageservers), eg + /// `{bucket_name='the-bucket',bucket_region='us-east-1',prefix_in_bucket='proxy',endpoint='http://minio:9000'}` + #[clap(long, value_parser = remote_storage_from_toml)] + metric_backup_collection_remote_storage: Option, + /// chunk size for backup metric collection + /// Size of each event is no more than 400 bytes, so 2**22 is about 200MB before the compression. + #[clap(long, default_value = "4194304")] + metric_backup_collection_chunk_size: usize, + /// Whether to retry the connection to the compute node + #[clap(long, default_value = config::RetryConfig::CONNECT_TO_COMPUTE_DEFAULT_VALUES)] + connect_to_compute_retry: String, + /// Whether to retry the wake_compute request + #[clap(long, default_value = config::RetryConfig::WAKE_COMPUTE_DEFAULT_VALUES)] + wake_compute_retry: String, + + /// Configure if this is a private access proxy for the POC: In that case the proxy will ignore the IP allowlist + #[clap(long, default_value_t = false, value_parser = clap::builder::BoolishValueParser::new(), action = clap::ArgAction::Set)] + is_private_access_proxy: bool, + + /// Configure whether all incoming requests have a Proxy Protocol V2 packet. + // TODO(conradludgate): switch default to rejected or required once we've updated all deployments + #[clap(value_enum, long, default_value_t = ProxyProtocolV2::Supported)] + proxy_protocol_v2: ProxyProtocolV2, + + /// Time the proxy waits for the webauth session to be confirmed by the control plane. + // TODO: rename to `console_redirect_confirmation_timeout`. + #[clap(long, default_value = "2m", value_parser = humantime::parse_duration)] + webauth_confirmation_timeout: std::time::Duration, +} + +#[derive(clap::Args, Clone, Copy, Debug)] +struct SqlOverHttpArgs { + /// timeout for http connection requests + #[clap(long, default_value = "15s", value_parser = humantime::parse_duration)] + sql_over_http_timeout: tokio::time::Duration, + + /// Whether the SQL over http pool is opt-in + #[clap(long, default_value_t = true, value_parser = clap::builder::BoolishValueParser::new(), action = clap::ArgAction::Set)] + sql_over_http_pool_opt_in: bool, + + /// How many connections to pool for each endpoint. Excess connections are discarded + #[clap(long, default_value_t = 20)] + sql_over_http_pool_max_conns_per_endpoint: usize, + + /// How many connections to pool for each endpoint. Excess connections are discarded + #[clap(long, default_value_t = 20000)] + sql_over_http_pool_max_total_conns: usize, + + /// How long pooled connections should remain idle for before closing + #[clap(long, default_value = "5m", value_parser = humantime::parse_duration)] + sql_over_http_idle_timeout: tokio::time::Duration, + + /// Duration each shard will wait on average before a GC sweep. + /// A longer time will causes sweeps to take longer but will interfere less frequently. + #[clap(long, default_value = "10m", value_parser = humantime::parse_duration)] + sql_over_http_pool_gc_epoch: tokio::time::Duration, + + /// How many shards should the global pool have. Must be a power of two. + /// More shards will introduce less contention for pool operations, but can + /// increase memory used by the pool + #[clap(long, default_value_t = 128)] + sql_over_http_pool_shards: usize, + + #[clap(long, default_value_t = 10000)] + sql_over_http_client_conn_threshold: u64, + + #[clap(long, default_value_t = 64)] + sql_over_http_cancel_set_shards: usize, + + #[clap(long, default_value_t = 10 * 1024 * 1024)] // 10 MiB + sql_over_http_max_request_size_bytes: usize, + + #[clap(long, default_value_t = 10 * 1024 * 1024)] // 10 MiB + sql_over_http_max_response_size_bytes: usize, +} + +pub async fn run() -> anyhow::Result<()> { + let _logging_guard = crate::logging::init().await?; + let _panic_hook_guard = utils::logging::replace_panic_hook_with_tracing_panic_hook(); + let _sentry_guard = init_sentry(Some(GIT_VERSION.into()), &[]); + + // TODO: refactor these to use labels + info!("Version: {GIT_VERSION}"); + info!("Build_tag: {BUILD_TAG}"); + let neon_metrics = ::metrics::NeonMetrics::new(::metrics::BuildInfo { + revision: GIT_VERSION, + build_tag: BUILD_TAG, + }); + + let jemalloc = match crate::jemalloc::MetricRecorder::new() { + Ok(t) => Some(t), + Err(e) => { + tracing::error!(error = ?e, "could not start jemalloc metrics loop"); + None + } + }; + + let args = ProxyCliArgs::parse(); + let config = build_config(&args)?; + let auth_backend = build_auth_backend(&args)?; + + match auth_backend { + Either::Left(auth_backend) => info!("Authentication backend: {auth_backend}"), + Either::Right(auth_backend) => info!("Authentication backend: {auth_backend:?}"), + }; + info!("Using region: {}", args.aws_region); + + // TODO: untangle the config args + let regional_redis_client = match (args.redis_auth_type.as_str(), &args.redis_notifications) { + ("plain", redis_url) => match redis_url { + None => { + bail!("plain auth requires redis_notifications to be set"); + } + Some(url) => Some( + ConnectionWithCredentialsProvider::new_with_static_credentials(url.to_string()), + ), + }, + ("irsa", _) => match (&args.redis_host, args.redis_port) { + (Some(host), Some(port)) => Some( + ConnectionWithCredentialsProvider::new_with_credentials_provider( + host.to_string(), + port, + elasticache::CredentialsProvider::new( + args.aws_region, + args.redis_cluster_name, + args.redis_user_id, + ) + .await, + ), + ), + (None, None) => { + warn!("irsa auth requires redis-host and redis-port to be set, continuing without regional_redis_client"); + None + } + _ => { + bail!("redis-host and redis-port must be specified together"); + } + }, + _ => { + bail!("unknown auth type given"); + } + }; + + let redis_notifications_client = if let Some(url) = args.redis_notifications { + Some(ConnectionWithCredentialsProvider::new_with_static_credentials(url)) + } else { + regional_redis_client.clone() + }; + + // Check that we can bind to address before further initialization + let http_address: SocketAddr = args.http.parse()?; + info!("Starting http on {http_address}"); + let http_listener = TcpListener::bind(http_address).await?.into_std()?; + + let mgmt_address: SocketAddr = args.mgmt.parse()?; + info!("Starting mgmt on {mgmt_address}"); + let mgmt_listener = TcpListener::bind(mgmt_address).await?; + + let proxy_listener = if args.is_auth_broker { + None + } else { + let proxy_address: SocketAddr = args.proxy.parse()?; + info!("Starting proxy on {proxy_address}"); + + Some(TcpListener::bind(proxy_address).await?) + }; + + // TODO: rename the argument to something like serverless. + // It now covers more than just websockets, it also covers SQL over HTTP. + let serverless_listener = if let Some(serverless_address) = args.wss { + let serverless_address: SocketAddr = serverless_address.parse()?; + info!("Starting wss on {serverless_address}"); + Some(TcpListener::bind(serverless_address).await?) + } else if args.is_auth_broker { + bail!("wss arg must be present for auth-broker") + } else { + None + }; + + let cancellation_token = CancellationToken::new(); + + let redis_rps_limit = Vec::leak(args.redis_rps_limit.clone()); + RateBucketInfo::validate(redis_rps_limit)?; + + let redis_kv_client = regional_redis_client + .as_ref() + .map(|redis_publisher| RedisKVClient::new(redis_publisher.clone(), redis_rps_limit)); + + // channel size should be higher than redis client limit to avoid blocking + let cancel_ch_size = args.cancellation_ch_size; + let (tx_cancel, rx_cancel) = tokio::sync::mpsc::channel(cancel_ch_size); + let cancellation_handler = Arc::new(CancellationHandler::new( + &config.connect_to_compute, + Some(tx_cancel), + )); + + // bit of a hack - find the min rps and max rps supported and turn it into + // leaky bucket config instead + let max = args + .endpoint_rps_limit + .iter() + .map(|x| x.rps()) + .max_by(f64::total_cmp) + .unwrap_or(EndpointRateLimiter::DEFAULT.max); + let rps = args + .endpoint_rps_limit + .iter() + .map(|x| x.rps()) + .min_by(f64::total_cmp) + .unwrap_or(EndpointRateLimiter::DEFAULT.rps); + let endpoint_rate_limiter = Arc::new(EndpointRateLimiter::new_with_shards( + LeakyBucketConfig { rps, max }, + 64, + )); + + // client facing tasks. these will exit on error or on cancellation + // cancellation returns Ok(()) + let mut client_tasks = JoinSet::new(); + match auth_backend { + Either::Left(auth_backend) => { + if let Some(proxy_listener) = proxy_listener { + client_tasks.spawn(crate::proxy::task_main( + config, + auth_backend, + proxy_listener, + cancellation_token.clone(), + cancellation_handler.clone(), + endpoint_rate_limiter.clone(), + )); + } + + if let Some(serverless_listener) = serverless_listener { + client_tasks.spawn(serverless::task_main( + config, + auth_backend, + serverless_listener, + cancellation_token.clone(), + cancellation_handler.clone(), + endpoint_rate_limiter.clone(), + )); + } + } + Either::Right(auth_backend) => { + if let Some(proxy_listener) = proxy_listener { + client_tasks.spawn(crate::console_redirect_proxy::task_main( + config, + auth_backend, + proxy_listener, + cancellation_token.clone(), + cancellation_handler.clone(), + )); + } + } + } + + client_tasks.spawn(crate::context::parquet::worker( + cancellation_token.clone(), + args.parquet_upload, + )); + + // maintenance tasks. these never return unless there's an error + let mut maintenance_tasks = JoinSet::new(); + maintenance_tasks.spawn(crate::signals::handle(cancellation_token.clone(), || {})); + maintenance_tasks.spawn(http::health_server::task_main( + http_listener, + AppMetrics { + jemalloc, + neon_metrics, + proxy: crate::metrics::Metrics::get(), + }, + )); + maintenance_tasks.spawn(control_plane::mgmt::task_main(mgmt_listener)); + + if let Some(metrics_config) = &config.metric_collection { + // TODO: Add gc regardles of the metric collection being enabled. + maintenance_tasks.spawn(usage_metrics::task_main(metrics_config)); + } + + #[cfg_attr(not(any(test, feature = "testing")), expect(irrefutable_let_patterns))] + if let Either::Left(auth::Backend::ControlPlane(api, ())) = &auth_backend { + if let crate::control_plane::client::ControlPlaneClient::ProxyV1(api) = &**api { + match (redis_notifications_client, regional_redis_client.clone()) { + (None, None) => {} + (client1, client2) => { + let cache = api.caches.project_info.clone(); + if let Some(client) = client1 { + maintenance_tasks.spawn(notifications::task_main( + client, + cache.clone(), + args.region.clone(), + )); + } + if let Some(client) = client2 { + maintenance_tasks.spawn(notifications::task_main( + client, + cache.clone(), + args.region.clone(), + )); + } + maintenance_tasks.spawn(async move { cache.clone().gc_worker().await }); + } + } + + if let Some(mut redis_kv_client) = redis_kv_client { + maintenance_tasks.spawn(async move { + redis_kv_client.try_connect().await?; + handle_cancel_messages(&mut redis_kv_client, rx_cancel).await + }); + } + + if let Some(regional_redis_client) = regional_redis_client { + let cache = api.caches.endpoints_cache.clone(); + let con = regional_redis_client; + let span = tracing::info_span!("endpoints_cache"); + maintenance_tasks.spawn( + async move { cache.do_read(con, cancellation_token.clone()).await } + .instrument(span), + ); + } + } + } + + let maintenance = loop { + // get one complete task + match futures::future::select( + pin!(maintenance_tasks.join_next()), + pin!(client_tasks.join_next()), + ) + .await + { + // exit immediately on maintenance task completion + Either::Left((Some(res), _)) => break crate::error::flatten_err(res)?, + // exit with error immediately if all maintenance tasks have ceased (should be caught by branch above) + Either::Left((None, _)) => bail!("no maintenance tasks running. invalid state"), + // exit immediately on client task error + Either::Right((Some(res), _)) => crate::error::flatten_err(res)?, + // exit if all our client tasks have shutdown gracefully + Either::Right((None, _)) => return Ok(()), + } + }; + + // maintenance tasks return Infallible success values, this is an impossible value + // so this match statically ensures that there are no possibilities for that value + match maintenance {} +} + +/// ProxyConfig is created at proxy startup, and lives forever. +fn build_config(args: &ProxyCliArgs) -> anyhow::Result<&'static ProxyConfig> { + let thread_pool = ThreadPool::new(args.scram_thread_pool_size); + Metrics::install(thread_pool.metrics.clone()); + + let tls_config = match (&args.tls_key, &args.tls_cert) { + (Some(key_path), Some(cert_path)) => Some(config::configure_tls( + key_path, + cert_path, + args.certs_dir.as_ref(), + args.allow_tls_keylogfile, + )?), + (None, None) => None, + _ => bail!("either both or neither tls-key and tls-cert must be specified"), + }; + + let backup_metric_collection_config = config::MetricBackupCollectionConfig { + remote_storage_config: args.metric_backup_collection_remote_storage.clone(), + chunk_size: args.metric_backup_collection_chunk_size, + }; + + let metric_collection = match ( + &args.metric_collection_endpoint, + &args.metric_collection_interval, + ) { + (Some(endpoint), Some(interval)) => Some(config::MetricCollectionConfig { + endpoint: endpoint.parse()?, + interval: humantime::parse_duration(interval)?, + backup_metric_collection_config, + }), + (None, None) => None, + _ => bail!( + "either both or neither metric-collection-endpoint \ + and metric-collection-interval must be specified" + ), + }; + + let config::ConcurrencyLockOptions { + shards, + limiter, + epoch, + timeout, + } = args.connect_compute_lock.parse()?; + info!( + ?limiter, + shards, + ?epoch, + "Using NodeLocks (connect_compute)" + ); + let connect_compute_locks = control_plane::locks::ApiLocks::new( + "connect_compute_lock", + limiter, + shards, + timeout, + epoch, + &Metrics::get().proxy.connect_compute_lock, + ); + + let http_config = HttpConfig { + accept_websockets: !args.is_auth_broker, + pool_options: GlobalConnPoolOptions { + max_conns_per_endpoint: args.sql_over_http.sql_over_http_pool_max_conns_per_endpoint, + gc_epoch: args.sql_over_http.sql_over_http_pool_gc_epoch, + pool_shards: args.sql_over_http.sql_over_http_pool_shards, + idle_timeout: args.sql_over_http.sql_over_http_idle_timeout, + opt_in: args.sql_over_http.sql_over_http_pool_opt_in, + max_total_conns: args.sql_over_http.sql_over_http_pool_max_total_conns, + }, + cancel_set: CancelSet::new(args.sql_over_http.sql_over_http_cancel_set_shards), + client_conn_threshold: args.sql_over_http.sql_over_http_client_conn_threshold, + max_request_size_bytes: args.sql_over_http.sql_over_http_max_request_size_bytes, + max_response_size_bytes: args.sql_over_http.sql_over_http_max_response_size_bytes, + }; + let authentication_config = AuthenticationConfig { + jwks_cache: JwkCache::default(), + thread_pool, + scram_protocol_timeout: args.scram_protocol_timeout, + rate_limiter_enabled: args.auth_rate_limit_enabled, + rate_limiter: AuthRateLimiter::new(args.auth_rate_limit.clone()), + rate_limit_ip_subnet: args.auth_rate_limit_ip_subnet, + ip_allowlist_check_enabled: !args.is_private_access_proxy, + is_vpc_acccess_proxy: args.is_private_access_proxy, + is_auth_broker: args.is_auth_broker, + accept_jwts: args.is_auth_broker, + console_redirect_confirmation_timeout: args.webauth_confirmation_timeout, + }; + + let compute_config = ComputeConfig { + retry: config::RetryConfig::parse(&args.connect_to_compute_retry)?, + tls: Arc::new(compute_client_config_with_root_certs()?), + timeout: Duration::from_secs(2), + }; + + let config = ProxyConfig { + tls_config, + metric_collection, + http_config, + authentication_config, + proxy_protocol_v2: args.proxy_protocol_v2, + handshake_timeout: args.handshake_timeout, + region: args.region.clone(), + wake_compute_retry_config: config::RetryConfig::parse(&args.wake_compute_retry)?, + connect_compute_locks, + connect_to_compute: compute_config, + }; + + let config = Box::leak(Box::new(config)); + + tokio::spawn(config.connect_compute_locks.garbage_collect_worker()); + + Ok(config) +} + +/// auth::Backend is created at proxy startup, and lives forever. +fn build_auth_backend( + args: &ProxyCliArgs, +) -> anyhow::Result, &'static ConsoleRedirectBackend>> { + match &args.auth_backend { + AuthBackendType::ControlPlaneV1 => { + let wake_compute_cache_config: CacheOptions = args.wake_compute_cache.parse()?; + let project_info_cache_config: ProjectInfoCacheOptions = + args.project_info_cache.parse()?; + let endpoint_cache_config: config::EndpointCacheConfig = + args.endpoint_cache_config.parse()?; + + info!("Using NodeInfoCache (wake_compute) with options={wake_compute_cache_config:?}"); + info!( + "Using AllowedIpsCache (wake_compute) with options={project_info_cache_config:?}" + ); + info!("Using EndpointCacheConfig with options={endpoint_cache_config:?}"); + let caches = Box::leak(Box::new(control_plane::caches::ApiCaches::new( + wake_compute_cache_config, + project_info_cache_config, + endpoint_cache_config, + ))); + + let config::ConcurrencyLockOptions { + shards, + limiter, + epoch, + timeout, + } = args.wake_compute_lock.parse()?; + info!(?limiter, shards, ?epoch, "Using NodeLocks (wake_compute)"); + let locks = Box::leak(Box::new(control_plane::locks::ApiLocks::new( + "wake_compute_lock", + limiter, + shards, + timeout, + epoch, + &Metrics::get().wake_compute_lock, + ))); + tokio::spawn(locks.garbage_collect_worker()); + + let url: crate::url::ApiUrl = args.auth_endpoint.parse()?; + + let endpoint = http::Endpoint::new(url, http::new_client()); + + let mut wake_compute_rps_limit = args.wake_compute_limit.clone(); + RateBucketInfo::validate(&mut wake_compute_rps_limit)?; + let wake_compute_endpoint_rate_limiter = + Arc::new(WakeComputeRateLimiter::new(wake_compute_rps_limit)); + + let api = control_plane::client::cplane_proxy_v1::NeonControlPlaneClient::new( + endpoint, + args.control_plane_token.clone(), + caches, + locks, + wake_compute_endpoint_rate_limiter, + ); + + let api = control_plane::client::ControlPlaneClient::ProxyV1(api); + let auth_backend = auth::Backend::ControlPlane(MaybeOwned::Owned(api), ()); + let config = Box::leak(Box::new(auth_backend)); + + Ok(Either::Left(config)) + } + + #[cfg(any(test, feature = "testing"))] + AuthBackendType::Postgres => { + let url = args.auth_endpoint.parse()?; + let api = control_plane::client::mock::MockControlPlane::new( + url, + !args.is_private_access_proxy, + ); + let api = control_plane::client::ControlPlaneClient::PostgresMock(api); + + let auth_backend = auth::Backend::ControlPlane(MaybeOwned::Owned(api), ()); + + let config = Box::leak(Box::new(auth_backend)); + + Ok(Either::Left(config)) + } + + AuthBackendType::ConsoleRedirect => { + let wake_compute_cache_config: CacheOptions = args.wake_compute_cache.parse()?; + let project_info_cache_config: ProjectInfoCacheOptions = + args.project_info_cache.parse()?; + let endpoint_cache_config: config::EndpointCacheConfig = + args.endpoint_cache_config.parse()?; + + info!("Using NodeInfoCache (wake_compute) with options={wake_compute_cache_config:?}"); + info!( + "Using AllowedIpsCache (wake_compute) with options={project_info_cache_config:?}" + ); + info!("Using EndpointCacheConfig with options={endpoint_cache_config:?}"); + let caches = Box::leak(Box::new(control_plane::caches::ApiCaches::new( + wake_compute_cache_config, + project_info_cache_config, + endpoint_cache_config, + ))); + + let config::ConcurrencyLockOptions { + shards, + limiter, + epoch, + timeout, + } = args.wake_compute_lock.parse()?; + info!(?limiter, shards, ?epoch, "Using NodeLocks (wake_compute)"); + let locks = Box::leak(Box::new(control_plane::locks::ApiLocks::new( + "wake_compute_lock", + limiter, + shards, + timeout, + epoch, + &Metrics::get().wake_compute_lock, + ))); + + let url = args.uri.clone().parse()?; + let ep_url: crate::url::ApiUrl = args.auth_endpoint.parse()?; + let endpoint = http::Endpoint::new(ep_url, http::new_client()); + let mut wake_compute_rps_limit = args.wake_compute_limit.clone(); + RateBucketInfo::validate(&mut wake_compute_rps_limit)?; + let wake_compute_endpoint_rate_limiter = + Arc::new(WakeComputeRateLimiter::new(wake_compute_rps_limit)); + + // Since we use only get_allowed_ips_and_secret() wake_compute_endpoint_rate_limiter + // and locks are not used in ConsoleRedirectBackend, + // but they are required by the NeonControlPlaneClient + let api = control_plane::client::cplane_proxy_v1::NeonControlPlaneClient::new( + endpoint, + args.control_plane_token.clone(), + caches, + locks, + wake_compute_endpoint_rate_limiter, + ); + + let backend = ConsoleRedirectBackend::new(url, api); + let config = Box::leak(Box::new(backend)); + + Ok(Either::Right(config)) + } + } +} + +#[cfg(test)] +mod tests { + use std::time::Duration; + + use crate::rate_limiter::RateBucketInfo; + use clap::Parser; + + #[test] + fn parse_endpoint_rps_limit() { + let config = super::ProxyCliArgs::parse_from([ + "proxy", + "--endpoint-rps-limit", + "100@1s", + "--endpoint-rps-limit", + "20@30s", + ]); + + assert_eq!( + config.endpoint_rps_limit, + vec![ + RateBucketInfo::new(100, Duration::from_secs(1)), + RateBucketInfo::new(20, Duration::from_secs(30)), + ] + ); + } +} diff --git a/proxy/src/compute_ctl/mod.rs b/proxy/src/compute_ctl/mod.rs index 60fdf107d4..ab3179afb2 100644 --- a/proxy/src/compute_ctl/mod.rs +++ b/proxy/src/compute_ctl/mod.rs @@ -42,14 +42,14 @@ pub enum Privilege { #[derive(Error, Debug)] pub enum ComputeCtlError { #[error("connection error: {0}")] - ConnectionError(#[source] reqwest_middleware::Error), + Connection(#[source] reqwest_middleware::Error), #[error("request error [{status}]: {body:?}")] - RequestError { + Request { status: StatusCode, body: Option, }, #[error("response parsing error: {0}")] - ResponseError(#[source] reqwest::Error), + Response(#[source] reqwest::Error), } impl ComputeCtlApi { @@ -89,14 +89,14 @@ impl ComputeCtlApi { .json(req) .send() .await - .map_err(ComputeCtlError::ConnectionError)?; + .map_err(ComputeCtlError::Connection)?; let status = resp.status(); if status.is_client_error() || status.is_server_error() { let body = resp.json().await.ok(); - return Err(ComputeCtlError::RequestError { status, body }); + return Err(ComputeCtlError::Request { status, body }); } - resp.json().await.map_err(ComputeCtlError::ResponseError) + resp.json().await.map_err(ComputeCtlError::Response) } } diff --git a/proxy/src/config.rs b/proxy/src/config.rs index 1dcd37712e..460e0cff54 100644 --- a/proxy/src/config.rs +++ b/proxy/src/config.rs @@ -151,7 +151,6 @@ impl FromStr for EndpointCacheConfig { } #[derive(Debug)] pub struct MetricBackupCollectionConfig { - pub interval: Duration, pub remote_storage_config: Option, pub chunk_size: usize, } diff --git a/proxy/src/control_plane/client/mod.rs b/proxy/src/control_plane/client/mod.rs index a06943726e..c28ff4789d 100644 --- a/proxy/src/control_plane/client/mod.rs +++ b/proxy/src/control_plane/client/mod.rs @@ -212,15 +212,15 @@ impl ApiLocks { timeout: Duration, epoch: std::time::Duration, metrics: &'static ApiLockMetrics, - ) -> prometheus::Result { - Ok(Self { + ) -> Self { + Self { name, node_locks: ClashMap::with_shard_amount(shards), config, timeout, epoch, metrics, - }) + } } pub(crate) async fn get_permit(&self, key: &K) -> Result { diff --git a/proxy/src/control_plane/messages.rs b/proxy/src/control_plane/messages.rs index 5883d02b92..8d6b2e96f5 100644 --- a/proxy/src/control_plane/messages.rs +++ b/proxy/src/control_plane/messages.rs @@ -361,7 +361,8 @@ pub struct EndpointJwksResponse { pub struct JwksSettings { pub id: String, pub jwks_url: url::Url, - pub provider_name: String, + #[serde(rename = "provider_name")] + pub _provider_name: String, pub jwt_audience: Option, pub role_names: Vec, } diff --git a/proxy/src/lib.rs b/proxy/src/lib.rs index c56474edd7..a9e5fbc85b 100644 --- a/proxy/src/lib.rs +++ b/proxy/src/lib.rs @@ -72,34 +72,36 @@ // List of temporarily allowed lints to unblock beta/nightly. #![allow(unknown_lints)] -pub mod auth; -pub mod cache; -pub mod cancellation; -pub mod compute; -pub mod compute_ctl; -pub mod config; -pub mod console_redirect_proxy; -pub mod context; -pub mod control_plane; -pub mod error; +pub mod binary; + +mod auth; +mod cache; +mod cancellation; +mod compute; +mod compute_ctl; +mod config; +mod console_redirect_proxy; +mod context; +mod control_plane; +mod error; mod ext; -pub mod http; -pub mod intern; -pub mod jemalloc; -pub mod logging; -pub mod metrics; -pub mod parse; -pub mod protocol2; -pub mod proxy; -pub mod rate_limiter; -pub mod redis; -pub mod sasl; -pub mod scram; -pub mod serverless; -pub mod signals; -pub mod stream; -pub mod tls; -pub mod types; -pub mod url; -pub mod usage_metrics; -pub mod waiters; +mod http; +mod intern; +mod jemalloc; +mod logging; +mod metrics; +mod parse; +mod protocol2; +mod proxy; +mod rate_limiter; +mod redis; +mod sasl; +mod scram; +mod serverless; +mod signals; +mod stream; +mod tls; +mod types; +mod url; +mod usage_metrics; +mod waiters; diff --git a/proxy/src/metrics.rs b/proxy/src/metrics.rs index 25bcc81108..f3447e063e 100644 --- a/proxy/src/metrics.rs +++ b/proxy/src/metrics.rs @@ -205,7 +205,7 @@ pub enum Protocol { } impl Protocol { - pub fn as_str(&self) -> &'static str { + pub fn as_str(self) -> &'static str { match self { Protocol::Http => "http", Protocol::Ws => "ws", @@ -385,6 +385,7 @@ pub enum Waiting { #[derive(FixedCardinalityLabel, Copy, Clone)] #[label(singleton = "kind")] +#[allow(clippy::enum_variant_names)] pub enum RedisMsgKind { HSet, HSetMultiple, diff --git a/proxy/src/redis/cancellation_publisher.rs b/proxy/src/redis/cancellation_publisher.rs index 30d8b83e60..186fece4b2 100644 --- a/proxy/src/redis/cancellation_publisher.rs +++ b/proxy/src/redis/cancellation_publisher.rs @@ -5,9 +5,6 @@ use pq_proto::CancelKeyData; use tokio::sync::Mutex; use uuid::Uuid; -use super::connection_with_credentials_provider::ConnectionWithCredentialsProvider; -use crate::rate_limiter::{GlobalRateLimiter, RateBucketInfo}; - pub trait CancellationPublisherMut: Send + Sync + 'static { #[allow(async_fn_in_trait)] async fn try_publish( @@ -79,36 +76,3 @@ impl CancellationPublisher for Arc> { .await } } - -pub struct RedisPublisherClient { - #[allow(dead_code)] - client: ConnectionWithCredentialsProvider, - _region_id: String, - _limiter: GlobalRateLimiter, -} - -impl RedisPublisherClient { - pub fn new( - client: ConnectionWithCredentialsProvider, - region_id: String, - info: &'static [RateBucketInfo], - ) -> anyhow::Result { - Ok(Self { - client, - _region_id: region_id, - _limiter: GlobalRateLimiter::new(info.into()), - }) - } - - #[allow(dead_code)] - pub(crate) async fn try_connect(&mut self) -> anyhow::Result<()> { - match self.client.connect().await { - Ok(()) => {} - Err(e) => { - tracing::error!("failed to connect to redis: {e}"); - return Err(e); - } - } - Ok(()) - } -} diff --git a/proxy/src/serverless/sql_over_http.rs b/proxy/src/serverless/sql_over_http.rs index 8739ce49f9..2eee3b7165 100644 --- a/proxy/src/serverless/sql_over_http.rs +++ b/proxy/src/serverless/sql_over_http.rs @@ -23,7 +23,6 @@ use tokio_util::sync::CancellationToken; use tracing::{debug, error, info}; use typed_json::json; use url::Url; -use urlencoding; use utils::http::error::ApiError; use uuid::Uuid;