diff --git a/Cargo.lock b/Cargo.lock index d8429c4183..6ae5aac127 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1265,6 +1265,7 @@ version = "0.1.0" dependencies = [ "anyhow", "bytes", + "camino", "cfg-if", "chrono", "clap", diff --git a/compute/Dockerfile.compute-node b/compute/Dockerfile.compute-node index 6f2a6597be..5332b9ca1f 100644 --- a/compute/Dockerfile.compute-node +++ b/compute/Dockerfile.compute-node @@ -1075,6 +1075,20 @@ RUN set -e \ && make -j $(nproc) dist_man_MANS= \ && make install dist_man_MANS= +######################################################################################### +# +# Compile the Neon-specific `local_proxy` binary +# +######################################################################################### +FROM $REPOSITORY/$IMAGE:$TAG AS local_proxy +ARG BUILD_TAG +ENV BUILD_TAG=$BUILD_TAG + +USER nonroot +# Copy entire project to get Cargo.* files with proper dependencies for the whole project +COPY --chown=nonroot . . +RUN mold -run cargo build --locked --profile release-line-debug-size-lto --bin local_proxy + ######################################################################################### # # Layers "postgres-exporter" and "sql-exporter" @@ -1213,6 +1227,10 @@ COPY --from=compute-tools --chown=postgres /home/nonroot/target/release-line-deb COPY --from=pgbouncer /usr/local/pgbouncer/bin/pgbouncer /usr/local/bin/pgbouncer COPY --chmod=0666 --chown=postgres compute/etc/pgbouncer.ini /etc/pgbouncer.ini +# local_proxy and its config +COPY --from=local_proxy --chown=postgres /home/nonroot/target/release-line-debug-size-lto/local_proxy /usr/local/bin/local_proxy +RUN mkdir -p /etc/local_proxy && chown postgres:postgres /etc/local_proxy + # Metrics exporter binaries and configuration files COPY --from=postgres-exporter /bin/postgres_exporter /bin/postgres_exporter COPY --from=sql-exporter /bin/sql_exporter /bin/sql_exporter diff --git a/compute/vm-image-spec.yaml b/compute/vm-image-spec.yaml index 50fcd62e4f..43e57a4ed5 100644 --- a/compute/vm-image-spec.yaml +++ b/compute/vm-image-spec.yaml @@ -19,6 +19,10 @@ commands: user: postgres sysvInitAction: respawn shell: '/usr/local/bin/pgbouncer /etc/pgbouncer.ini' + - name: local_proxy + user: postgres + sysvInitAction: respawn + shell: '/usr/local/bin/local_proxy --config-path /etc/local_proxy/config.json --pid-path /etc/local_proxy/pid --http 0.0.0.0:10432' - name: postgres-exporter user: nobody sysvInitAction: respawn diff --git a/compute_tools/Cargo.toml b/compute_tools/Cargo.toml index 6bb3e211b6..91e0b9d5b8 100644 --- a/compute_tools/Cargo.toml +++ b/compute_tools/Cargo.toml @@ -11,6 +11,7 @@ testing = [] [dependencies] anyhow.workspace = true +camino.workspace = true chrono.workspace = true cfg-if.workspace = true clap.workspace = true diff --git a/compute_tools/src/compute.rs b/compute_tools/src/compute.rs index 2f6e2bdb2c..ba7b4f37df 100644 --- a/compute_tools/src/compute.rs +++ b/compute_tools/src/compute.rs @@ -34,6 +34,7 @@ use nix::sys::signal::{kill, Signal}; use remote_storage::{DownloadError, RemotePath}; use crate::checker::create_availability_check_data; +use crate::local_proxy; use crate::logger::inlinify; use crate::pg_helpers::*; use crate::spec::*; @@ -886,6 +887,11 @@ impl ComputeNode { // 'Close' connection drop(client); + if let Some(ref local_proxy) = spec.local_proxy_config { + info!("configuring local_proxy"); + local_proxy::configure(local_proxy).context("apply_config local_proxy")?; + } + // Run migrations separately to not hold up cold starts thread::spawn(move || { let mut connstr = connstr.clone(); @@ -936,6 +942,19 @@ impl ComputeNode { }); } + if let Some(ref local_proxy) = spec.local_proxy_config { + info!("configuring local_proxy"); + + // Spawn a thread to do the configuration, + // so that we don't block the main thread that starts Postgres. + let local_proxy = local_proxy.clone(); + let _handle = Some(thread::spawn(move || { + if let Err(err) = local_proxy::configure(&local_proxy) { + error!("error while configuring local_proxy: {err:?}"); + } + })); + } + // Write new config let pgdata_path = Path::new(&self.pgdata); let postgresql_conf_path = pgdata_path.join("postgresql.conf"); @@ -1023,6 +1042,19 @@ impl ComputeNode { }); } + if let Some(local_proxy) = &pspec.spec.local_proxy_config { + info!("configuring local_proxy"); + + // Spawn a thread to do the configuration, + // so that we don't block the main thread that starts Postgres. + let local_proxy = local_proxy.clone(); + let _handle = thread::spawn(move || { + if let Err(err) = local_proxy::configure(&local_proxy) { + error!("error while configuring local_proxy: {err:?}"); + } + }); + } + info!( "start_compute spec.remote_extensions {:?}", pspec.spec.remote_extensions diff --git a/compute_tools/src/lib.rs b/compute_tools/src/lib.rs index 0795eb6171..477f423aa2 100644 --- a/compute_tools/src/lib.rs +++ b/compute_tools/src/lib.rs @@ -15,6 +15,7 @@ pub mod catalog; pub mod compute; pub mod disk_quota; pub mod extension_server; +pub mod local_proxy; pub mod lsn_lease; mod migration; pub mod monitor; diff --git a/compute_tools/src/local_proxy.rs b/compute_tools/src/local_proxy.rs new file mode 100644 index 0000000000..3de3c58786 --- /dev/null +++ b/compute_tools/src/local_proxy.rs @@ -0,0 +1,56 @@ +//! Local Proxy is a feature of our BaaS Neon Authorize project. +//! +//! Local Proxy validates JWTs and manages the pg_session_jwt extension. +//! It also maintains a connection pool to postgres. + +use anyhow::{Context, Result}; +use camino::Utf8Path; +use compute_api::spec::LocalProxySpec; +use nix::sys::signal::Signal; +use utils::pid_file::{self, PidFileRead}; + +pub fn configure(local_proxy: &LocalProxySpec) -> Result<()> { + write_local_proxy_conf("/etc/local_proxy/config.json".as_ref(), local_proxy)?; + notify_local_proxy("/etc/local_proxy/pid".as_ref())?; + + Ok(()) +} + +/// Create or completely rewrite configuration file specified by `path` +fn write_local_proxy_conf(path: &Utf8Path, local_proxy: &LocalProxySpec) -> Result<()> { + let config = + serde_json::to_string_pretty(local_proxy).context("serializing LocalProxySpec to json")?; + std::fs::write(path, config).with_context(|| format!("writing {path}"))?; + + Ok(()) +} + +/// Notify local proxy about a new config file. +fn notify_local_proxy(path: &Utf8Path) -> Result<()> { + match pid_file::read(path)? { + // if the file doesn't exist, or isn't locked, local_proxy isn't running + // and will naturally pick up our config later + PidFileRead::NotExist | PidFileRead::NotHeldByAnyProcess(_) => {} + PidFileRead::LockedByOtherProcess(pid) => { + // From the pid_file docs: + // + // > 1. The other process might exit at any time, turning the given PID stale. + // > 2. There is a small window in which `claim_for_current_process` has already + // > locked the file but not yet updates its contents. [`read`] will return + // > this variant here, but with the old file contents, i.e., a stale PID. + // > + // > The kernel is free to recycle PID once it has been `wait(2)`ed upon by + // > its creator. Thus, acting upon a stale PID, e.g., by issuing a `kill` + // > system call on it, bears the risk of killing an unrelated process. + // > This is an inherent limitation of using pidfiles. + // > The only race-free solution is to have a supervisor-process with a lifetime + // > that exceeds that of all of its child-processes (e.g., `runit`, `supervisord`). + // + // This is an ok risk as we only send a SIGHUP which likely won't actually + // kill the process, only reload config. + nix::sys::signal::kill(pid, Signal::SIGHUP).context("sending signal to local_proxy")?; + } + } + + Ok(()) +} diff --git a/control_plane/src/endpoint.rs b/control_plane/src/endpoint.rs index 18f396b886..7cdf621737 100644 --- a/control_plane/src/endpoint.rs +++ b/control_plane/src/endpoint.rs @@ -599,6 +599,7 @@ impl Endpoint { remote_extensions, pgbouncer_settings: None, shard_stripe_size: Some(shard_stripe_size), + local_proxy_config: None, }; let spec_path = self.endpoint_path().join("spec.json"); std::fs::write(spec_path, serde_json::to_string_pretty(&spec)?)?; diff --git a/libs/compute_api/src/spec.rs b/libs/compute_api/src/spec.rs index 83515a00a0..5903db7055 100644 --- a/libs/compute_api/src/spec.rs +++ b/libs/compute_api/src/spec.rs @@ -106,6 +106,10 @@ pub struct ComputeSpec { // Stripe size for pageserver sharding, in pages #[serde(default)] pub shard_stripe_size: Option, + + /// Local Proxy configuration used for JWT authentication + #[serde(default)] + pub local_proxy_config: Option, } /// Feature flag to signal `compute_ctl` to enable certain experimental functionality. @@ -278,11 +282,13 @@ pub struct GenericOption { /// declare a `trait` on it. pub type GenericOptions = Option>; -/// Configured the local-proxy application with the relevant JWKS and roles it should +/// Configured the local_proxy application with the relevant JWKS and roles it should /// use for authorizing connect requests using JWT. #[derive(Clone, Debug, Deserialize, Serialize)] pub struct LocalProxySpec { - pub jwks: Vec, + #[serde(default)] + #[serde(skip_serializing_if = "Option::is_none")] + pub jwks: Option>, } #[derive(Clone, Debug, Deserialize, Serialize)] diff --git a/proxy/src/bin/local_proxy.rs b/proxy/src/bin/local_proxy.rs index b18810adbe..d5ce1e9273 100644 --- a/proxy/src/bin/local_proxy.rs +++ b/proxy/src/bin/local_proxy.rs @@ -77,10 +77,10 @@ struct LocalProxyCliArgs { #[clap(long, default_value = "127.0.0.1:5432")] compute: SocketAddr, /// Path of the local proxy config file - #[clap(long, default_value = "./localproxy.json")] + #[clap(long, default_value = "./local_proxy.json")] config_path: Utf8PathBuf, /// Path of the local proxy PID file - #[clap(long, default_value = "./localproxy.pid")] + #[clap(long, default_value = "./local_proxy.pid")] pid_path: Utf8PathBuf, } @@ -109,7 +109,7 @@ struct SqlOverHttpArgs { #[tokio::main] async fn main() -> anyhow::Result<()> { - let _logging_guard = proxy::logging::init().await?; + let _logging_guard = proxy::logging::init_local_proxy()?; let _panic_hook_guard = utils::logging::replace_panic_hook_with_tracing_panic_hook(); let _sentry_guard = init_sentry(Some(GIT_VERSION.into()), &[]); @@ -138,7 +138,7 @@ async fn main() -> anyhow::Result<()> { // in order to trigger the appropriate SIGHUP on config change. // // This also claims a "lock" that makes sure only one instance - // of local-proxy runs at a time. + // of local_proxy runs at a time. let _process_guard = loop { match pid_file::claim_for_current_process(&args.pid_path) { Ok(guard) => break guard, @@ -164,12 +164,6 @@ async fn main() -> anyhow::Result<()> { 16, )); - // write the process ID to a file so that compute-ctl can find our process later - // in order to trigger the appropriate SIGHUP on config change. - let pid = std::process::id(); - info!("process running in PID {pid}"); - std::fs::write(args.pid_path, format!("{pid}\n")).context("writing PID to file")?; - let mut maintenance_tasks = JoinSet::new(); let refresh_config_notify = Arc::new(Notify::new()); @@ -182,9 +176,9 @@ async fn main() -> anyhow::Result<()> { // trigger the first config load **after** setting up the signal hook // to avoid the race condition where: - // 1. No config file registered when local-proxy starts up + // 1. No config file registered when local_proxy starts up // 2. The config file is written but the signal hook is not yet received - // 3. local-proxy completes startup but has no config loaded, despite there being a registerd config. + // 3. local_proxy completes startup but has no config loaded, despite there being a registerd config. refresh_config_notify.notify_one(); tokio::spawn(refresh_config_loop(args.config_path, refresh_config_notify)); @@ -311,7 +305,7 @@ async fn refresh_config_inner(path: &Utf8Path) -> anyhow::Result<()> { let mut jwks_set = vec![]; - for jwks in data.jwks { + for jwks in data.jwks.into_iter().flatten() { let mut jwks_url = url::Url::from_str(&jwks.jwks_url).context("parsing JWKS url")?; ensure!( diff --git a/proxy/src/logging.rs b/proxy/src/logging.rs index 2e773fabb3..a34eb820f8 100644 --- a/proxy/src/logging.rs +++ b/proxy/src/logging.rs @@ -1,6 +1,13 @@ +use tracing::Subscriber; use tracing_subscriber::{ filter::{EnvFilter, LevelFilter}, + fmt::{ + format::{Format, Full}, + time::SystemTime, + FormatEvent, FormatFields, + }, prelude::*, + registry::LookupSpan, }; /// Initialize logging and OpenTelemetry tracing and exporter. @@ -33,6 +40,45 @@ pub async fn init() -> anyhow::Result { Ok(LoggingGuard) } +/// Initialize logging for local_proxy with log prefix and no opentelemetry. +/// +/// Logging can be configured using `RUST_LOG` environment variable. +pub fn init_local_proxy() -> anyhow::Result { + let env_filter = EnvFilter::builder() + .with_default_directive(LevelFilter::INFO.into()) + .from_env_lossy(); + + let fmt_layer = tracing_subscriber::fmt::layer() + .with_ansi(false) + .with_writer(std::io::stderr) + .event_format(LocalProxyFormatter(Format::default().with_target(false))); + + tracing_subscriber::registry() + .with(env_filter) + .with(fmt_layer) + .try_init()?; + + Ok(LoggingGuard) +} + +pub struct LocalProxyFormatter(Format); + +impl FormatEvent for LocalProxyFormatter +where + S: Subscriber + for<'a> LookupSpan<'a>, + N: for<'a> FormatFields<'a> + 'static, +{ + fn format_event( + &self, + ctx: &tracing_subscriber::fmt::FmtContext<'_, S, N>, + mut writer: tracing_subscriber::fmt::format::Writer<'_>, + event: &tracing::Event<'_>, + ) -> std::fmt::Result { + writer.write_str("[local_proxy] ")?; + self.0.format_event(ctx, writer, event) + } +} + pub struct LoggingGuard; impl Drop for LoggingGuard {