proxy: add local-proxy to compute image (#8823)

1. Adds local-proxy to compute image and vm spec
2. Updates local-proxy config processing, writing PID to a file eagerly
3. Updates compute-ctl to understand local proxy compute spec and to
send SIGHUP to local-proxy over that pid.

closes https://github.com/neondatabase/cloud/issues/16867
This commit is contained in:
Conrad Ludgate
2024-10-04 15:52:01 +01:00
committed by GitHub
parent db53f98725
commit 6c05f89f7d
11 changed files with 175 additions and 15 deletions

View File

@@ -77,10 +77,10 @@ struct LocalProxyCliArgs {
#[clap(long, default_value = "127.0.0.1:5432")]
compute: SocketAddr,
/// Path of the local proxy config file
#[clap(long, default_value = "./localproxy.json")]
#[clap(long, default_value = "./local_proxy.json")]
config_path: Utf8PathBuf,
/// Path of the local proxy PID file
#[clap(long, default_value = "./localproxy.pid")]
#[clap(long, default_value = "./local_proxy.pid")]
pid_path: Utf8PathBuf,
}
@@ -109,7 +109,7 @@ struct SqlOverHttpArgs {
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let _logging_guard = proxy::logging::init().await?;
let _logging_guard = proxy::logging::init_local_proxy()?;
let _panic_hook_guard = utils::logging::replace_panic_hook_with_tracing_panic_hook();
let _sentry_guard = init_sentry(Some(GIT_VERSION.into()), &[]);
@@ -138,7 +138,7 @@ async fn main() -> anyhow::Result<()> {
// in order to trigger the appropriate SIGHUP on config change.
//
// This also claims a "lock" that makes sure only one instance
// of local-proxy runs at a time.
// of local_proxy runs at a time.
let _process_guard = loop {
match pid_file::claim_for_current_process(&args.pid_path) {
Ok(guard) => break guard,
@@ -164,12 +164,6 @@ async fn main() -> anyhow::Result<()> {
16,
));
// write the process ID to a file so that compute-ctl can find our process later
// in order to trigger the appropriate SIGHUP on config change.
let pid = std::process::id();
info!("process running in PID {pid}");
std::fs::write(args.pid_path, format!("{pid}\n")).context("writing PID to file")?;
let mut maintenance_tasks = JoinSet::new();
let refresh_config_notify = Arc::new(Notify::new());
@@ -182,9 +176,9 @@ async fn main() -> anyhow::Result<()> {
// trigger the first config load **after** setting up the signal hook
// to avoid the race condition where:
// 1. No config file registered when local-proxy starts up
// 1. No config file registered when local_proxy starts up
// 2. The config file is written but the signal hook is not yet received
// 3. local-proxy completes startup but has no config loaded, despite there being a registerd config.
// 3. local_proxy completes startup but has no config loaded, despite there being a registerd config.
refresh_config_notify.notify_one();
tokio::spawn(refresh_config_loop(args.config_path, refresh_config_notify));
@@ -311,7 +305,7 @@ async fn refresh_config_inner(path: &Utf8Path) -> anyhow::Result<()> {
let mut jwks_set = vec![];
for jwks in data.jwks {
for jwks in data.jwks.into_iter().flatten() {
let mut jwks_url = url::Url::from_str(&jwks.jwks_url).context("parsing JWKS url")?;
ensure!(

View File

@@ -1,6 +1,13 @@
use tracing::Subscriber;
use tracing_subscriber::{
filter::{EnvFilter, LevelFilter},
fmt::{
format::{Format, Full},
time::SystemTime,
FormatEvent, FormatFields,
},
prelude::*,
registry::LookupSpan,
};
/// Initialize logging and OpenTelemetry tracing and exporter.
@@ -33,6 +40,45 @@ pub async fn init() -> anyhow::Result<LoggingGuard> {
Ok(LoggingGuard)
}
/// Initialize logging for local_proxy with log prefix and no opentelemetry.
///
/// Logging can be configured using `RUST_LOG` environment variable.
pub fn init_local_proxy() -> anyhow::Result<LoggingGuard> {
let env_filter = EnvFilter::builder()
.with_default_directive(LevelFilter::INFO.into())
.from_env_lossy();
let fmt_layer = tracing_subscriber::fmt::layer()
.with_ansi(false)
.with_writer(std::io::stderr)
.event_format(LocalProxyFormatter(Format::default().with_target(false)));
tracing_subscriber::registry()
.with(env_filter)
.with(fmt_layer)
.try_init()?;
Ok(LoggingGuard)
}
pub struct LocalProxyFormatter(Format<Full, SystemTime>);
impl<S, N> FormatEvent<S, N> for LocalProxyFormatter
where
S: Subscriber + for<'a> LookupSpan<'a>,
N: for<'a> FormatFields<'a> + 'static,
{
fn format_event(
&self,
ctx: &tracing_subscriber::fmt::FmtContext<'_, S, N>,
mut writer: tracing_subscriber::fmt::format::Writer<'_>,
event: &tracing::Event<'_>,
) -> std::fmt::Result {
writer.write_str("[local_proxy] ")?;
self.0.format_event(ctx, writer, event)
}
}
pub struct LoggingGuard;
impl Drop for LoggingGuard {