mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-17 18:32:56 +00:00
Compare commits
4 Commits
release-80
...
local-prox
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
87c793f58c | ||
|
|
52a7d780ad | ||
|
|
2255a8ebac | ||
|
|
e109d5aac0 |
4
.github/workflows/build_and_test.yml
vendored
4
.github/workflows/build_and_test.yml
vendored
@@ -33,7 +33,7 @@ jobs:
|
||||
github-event-name: ${{ github.event_name }}
|
||||
|
||||
cancel-previous-e2e-tests:
|
||||
needs: [ check-permissions ]
|
||||
needs: [ check-permissions, promote-images, tag ]
|
||||
if: github.event_name == 'pull_request'
|
||||
runs-on: ubuntu-22.04
|
||||
|
||||
@@ -518,7 +518,7 @@ jobs:
|
||||
|
||||
trigger-e2e-tests:
|
||||
if: ${{ !github.event.pull_request.draft || contains( github.event.pull_request.labels.*.name, 'run-e2e-tests-in-draft') || github.ref_name == 'main' || github.ref_name == 'release' || github.ref_name == 'release-proxy' }}
|
||||
needs: [ check-permissions, promote-images, tag ]
|
||||
needs: [ check-permissions, promote-images, tag, cancel-previous-e2e-tests ]
|
||||
uses: ./.github/workflows/trigger-e2e-tests.yml
|
||||
secrets: inherit
|
||||
|
||||
|
||||
@@ -11,6 +11,7 @@ testing = []
|
||||
|
||||
[dependencies]
|
||||
anyhow.workspace = true
|
||||
# camino.workspace = true
|
||||
chrono.workspace = true
|
||||
cfg-if.workspace = true
|
||||
clap.workspace = true
|
||||
|
||||
@@ -264,68 +264,72 @@ async fn handle_configure_request(
|
||||
|
||||
let body_bytes = hyper::body::to_bytes(req.into_body()).await.unwrap();
|
||||
let spec_raw = String::from_utf8(body_bytes.to_vec()).unwrap();
|
||||
if let Ok(request) = serde_json::from_str::<ConfigurationRequest>(&spec_raw) {
|
||||
let spec = request.spec;
|
||||
match serde_json::from_str::<ConfigurationRequest>(&spec_raw) {
|
||||
Ok(request) => {
|
||||
let spec = request.spec;
|
||||
|
||||
let parsed_spec = match ParsedSpec::try_from(spec) {
|
||||
Ok(ps) => ps,
|
||||
Err(msg) => return Err((msg, StatusCode::BAD_REQUEST)),
|
||||
};
|
||||
let parsed_spec = match ParsedSpec::try_from(spec) {
|
||||
Ok(ps) => ps,
|
||||
Err(msg) => return Err((msg, StatusCode::BAD_REQUEST)),
|
||||
};
|
||||
|
||||
// XXX: wrap state update under lock in code blocks. Otherwise,
|
||||
// we will try to `Send` `mut state` into the spawned thread
|
||||
// bellow, which will cause error:
|
||||
// ```
|
||||
// error: future cannot be sent between threads safely
|
||||
// ```
|
||||
{
|
||||
let mut state = compute.state.lock().unwrap();
|
||||
if state.status != ComputeStatus::Empty && state.status != ComputeStatus::Running {
|
||||
let msg = format!(
|
||||
"invalid compute status for configuration request: {:?}",
|
||||
state.status.clone()
|
||||
);
|
||||
return Err((msg, StatusCode::PRECONDITION_FAILED));
|
||||
}
|
||||
state.pspec = Some(parsed_spec);
|
||||
state.status = ComputeStatus::ConfigurationPending;
|
||||
compute.state_changed.notify_all();
|
||||
drop(state);
|
||||
info!("set new spec and notified waiters");
|
||||
}
|
||||
|
||||
// Spawn a blocking thread to wait for compute to become Running.
|
||||
// This is needed to do not block the main pool of workers and
|
||||
// be able to serve other requests while some particular request
|
||||
// is waiting for compute to finish configuration.
|
||||
let c = compute.clone();
|
||||
task::spawn_blocking(move || {
|
||||
let mut state = c.state.lock().unwrap();
|
||||
while state.status != ComputeStatus::Running {
|
||||
state = c.state_changed.wait(state).unwrap();
|
||||
info!(
|
||||
"waiting for compute to become Running, current status: {:?}",
|
||||
state.status
|
||||
);
|
||||
|
||||
if state.status == ComputeStatus::Failed {
|
||||
let err = state.error.as_ref().map_or("unknown error", |x| x);
|
||||
let msg = format!("compute configuration failed: {:?}", err);
|
||||
return Err((msg, StatusCode::INTERNAL_SERVER_ERROR));
|
||||
// XXX: wrap state update under lock in code blocks. Otherwise,
|
||||
// we will try to `Send` `mut state` into the spawned thread
|
||||
// bellow, which will cause error:
|
||||
// ```
|
||||
// error: future cannot be sent between threads safely
|
||||
// ```
|
||||
{
|
||||
let mut state = compute.state.lock().unwrap();
|
||||
if state.status != ComputeStatus::Empty && state.status != ComputeStatus::Running {
|
||||
let msg = format!(
|
||||
"invalid compute status for configuration request: {:?}",
|
||||
state.status.clone()
|
||||
);
|
||||
return Err((msg, StatusCode::PRECONDITION_FAILED));
|
||||
}
|
||||
state.pspec = Some(parsed_spec);
|
||||
state.status = ComputeStatus::ConfigurationPending;
|
||||
compute.state_changed.notify_all();
|
||||
drop(state);
|
||||
info!("set new spec and notified waiters");
|
||||
}
|
||||
|
||||
Ok(())
|
||||
})
|
||||
.await
|
||||
.unwrap()?;
|
||||
// Spawn a blocking thread to wait for compute to become Running.
|
||||
// This is needed to do not block the main pool of workers and
|
||||
// be able to serve other requests while some particular request
|
||||
// is waiting for compute to finish configuration.
|
||||
let c = compute.clone();
|
||||
task::spawn_blocking(move || {
|
||||
let mut state = c.state.lock().unwrap();
|
||||
while state.status != ComputeStatus::Running {
|
||||
state = c.state_changed.wait(state).unwrap();
|
||||
info!(
|
||||
"waiting for compute to become Running, current status: {:?}",
|
||||
state.status
|
||||
);
|
||||
|
||||
// Return current compute state if everything went well.
|
||||
let state = compute.state.lock().unwrap().clone();
|
||||
let status_response = status_response_from_state(&state);
|
||||
Ok(serde_json::to_string(&status_response).unwrap())
|
||||
} else {
|
||||
Err(("invalid spec".to_string(), StatusCode::BAD_REQUEST))
|
||||
if state.status == ComputeStatus::Failed {
|
||||
let err = state.error.as_ref().map_or("unknown error", |x| x);
|
||||
let msg = format!("compute configuration failed: {:?}", err);
|
||||
return Err((msg, StatusCode::INTERNAL_SERVER_ERROR));
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
})
|
||||
.await
|
||||
.unwrap()?;
|
||||
|
||||
// Return current compute state if everything went well.
|
||||
let state = compute.state.lock().unwrap().clone();
|
||||
let status_response = status_response_from_state(&state);
|
||||
Ok(serde_json::to_string(&status_response).unwrap())
|
||||
}
|
||||
Err(err) => {
|
||||
error!("could not parse spec: {spec_raw}");
|
||||
Err((format!("invalid spec: {err:?}"), StatusCode::BAD_REQUEST))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -15,6 +15,7 @@ pub mod catalog;
|
||||
pub mod compute;
|
||||
pub mod disk_quota;
|
||||
pub mod extension_server;
|
||||
// pub mod local_proxy;
|
||||
pub mod lsn_lease;
|
||||
mod migration;
|
||||
pub mod monitor;
|
||||
|
||||
56
compute_tools/src/local_proxy.rs
Normal file
56
compute_tools/src/local_proxy.rs
Normal file
@@ -0,0 +1,56 @@
|
||||
//! Local Proxy is a feature of our BaaS Neon Authorize project.
|
||||
//!
|
||||
//! Local Proxy validates JWTs and manages the pg_session_jwt extension.
|
||||
//! It also maintains a connection pool to postgres.
|
||||
|
||||
use anyhow::{Context, Result};
|
||||
use camino::Utf8Path;
|
||||
use compute_api::spec::LocalProxySpec;
|
||||
use nix::sys::signal::Signal;
|
||||
use utils::pid_file::{self, PidFileRead};
|
||||
|
||||
pub fn configure(local_proxy: &LocalProxySpec) -> Result<()> {
|
||||
write_local_proxy_conf("/etc/local_proxy/config.json".as_ref(), local_proxy)?;
|
||||
notify_local_proxy("/etc/local_proxy/pid".as_ref())?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Create or completely rewrite configuration file specified by `path`
|
||||
fn write_local_proxy_conf(path: &Utf8Path, local_proxy: &LocalProxySpec) -> Result<()> {
|
||||
let config =
|
||||
serde_json::to_string_pretty(local_proxy).context("serializing LocalProxySpec to json")?;
|
||||
std::fs::write(path, config).with_context(|| format!("writing {path}"))?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Notify local proxy about a new config file.
|
||||
fn notify_local_proxy(path: &Utf8Path) -> Result<()> {
|
||||
match pid_file::read(path)? {
|
||||
// if the file doesn't exist, or isn't locked, local_proxy isn't running
|
||||
// and will naturally pick up our config later
|
||||
PidFileRead::NotExist | PidFileRead::NotHeldByAnyProcess(_) => {}
|
||||
PidFileRead::LockedByOtherProcess(pid) => {
|
||||
// From the pid_file docs:
|
||||
//
|
||||
// > 1. The other process might exit at any time, turning the given PID stale.
|
||||
// > 2. There is a small window in which `claim_for_current_process` has already
|
||||
// > locked the file but not yet updates its contents. [`read`] will return
|
||||
// > this variant here, but with the old file contents, i.e., a stale PID.
|
||||
// >
|
||||
// > The kernel is free to recycle PID once it has been `wait(2)`ed upon by
|
||||
// > its creator. Thus, acting upon a stale PID, e.g., by issuing a `kill`
|
||||
// > system call on it, bears the risk of killing an unrelated process.
|
||||
// > This is an inherent limitation of using pidfiles.
|
||||
// > The only race-free solution is to have a supervisor-process with a lifetime
|
||||
// > that exceeds that of all of its child-processes (e.g., `runit`, `supervisord`).
|
||||
//
|
||||
// This is an ok risk as we only send a SIGHUP which likely won't actually
|
||||
// kill the process, only reload config.
|
||||
nix::sys::signal::kill(pid, Signal::SIGHUP).context("sending signal to local_proxy")?;
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -599,6 +599,7 @@ impl Endpoint {
|
||||
remote_extensions,
|
||||
pgbouncer_settings: None,
|
||||
shard_stripe_size: Some(shard_stripe_size),
|
||||
local_proxy_config: None,
|
||||
};
|
||||
let spec_path = self.endpoint_path().join("spec.json");
|
||||
std::fs::write(spec_path, serde_json::to_string_pretty(&spec)?)?;
|
||||
|
||||
@@ -106,6 +106,11 @@ pub struct ComputeSpec {
|
||||
// Stripe size for pageserver sharding, in pages
|
||||
#[serde(default)]
|
||||
pub shard_stripe_size: Option<usize>,
|
||||
|
||||
/// Local Proxy configuration used for JWT authentication
|
||||
#[serde(default)]
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub local_proxy_config: Option<LocalProxySpec>,
|
||||
}
|
||||
|
||||
/// Feature flag to signal `compute_ctl` to enable certain experimental functionality.
|
||||
|
||||
@@ -109,7 +109,7 @@ struct SqlOverHttpArgs {
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() -> anyhow::Result<()> {
|
||||
let _logging_guard = proxy::logging::init().await?;
|
||||
let _logging_guard = proxy::logging::init_local_proxy()?;
|
||||
let _panic_hook_guard = utils::logging::replace_panic_hook_with_tracing_panic_hook();
|
||||
let _sentry_guard = init_sentry(Some(GIT_VERSION.into()), &[]);
|
||||
|
||||
@@ -138,7 +138,7 @@ async fn main() -> anyhow::Result<()> {
|
||||
// in order to trigger the appropriate SIGHUP on config change.
|
||||
//
|
||||
// This also claims a "lock" that makes sure only one instance
|
||||
// of local-proxy runs at a time.
|
||||
// of local_proxy runs at a time.
|
||||
let _process_guard = loop {
|
||||
match pid_file::claim_for_current_process(&args.pid_path) {
|
||||
Ok(guard) => break guard,
|
||||
@@ -164,12 +164,6 @@ async fn main() -> anyhow::Result<()> {
|
||||
16,
|
||||
));
|
||||
|
||||
// write the process ID to a file so that compute-ctl can find our process later
|
||||
// in order to trigger the appropriate SIGHUP on config change.
|
||||
let pid = std::process::id();
|
||||
info!("process running in PID {pid}");
|
||||
std::fs::write(args.pid_path, format!("{pid}\n")).context("writing PID to file")?;
|
||||
|
||||
let mut maintenance_tasks = JoinSet::new();
|
||||
|
||||
let refresh_config_notify = Arc::new(Notify::new());
|
||||
@@ -182,9 +176,9 @@ async fn main() -> anyhow::Result<()> {
|
||||
|
||||
// trigger the first config load **after** setting up the signal hook
|
||||
// to avoid the race condition where:
|
||||
// 1. No config file registered when local-proxy starts up
|
||||
// 1. No config file registered when local_proxy starts up
|
||||
// 2. The config file is written but the signal hook is not yet received
|
||||
// 3. local-proxy completes startup but has no config loaded, despite there being a registerd config.
|
||||
// 3. local_proxy completes startup but has no config loaded, despite there being a registerd config.
|
||||
refresh_config_notify.notify_one();
|
||||
tokio::spawn(refresh_config_loop(args.config_path, refresh_config_notify));
|
||||
|
||||
|
||||
@@ -1,6 +1,13 @@
|
||||
use tracing::Subscriber;
|
||||
use tracing_subscriber::{
|
||||
filter::{EnvFilter, LevelFilter},
|
||||
fmt::{
|
||||
format::{Format, Full},
|
||||
time::SystemTime,
|
||||
FormatEvent, FormatFields,
|
||||
},
|
||||
prelude::*,
|
||||
registry::LookupSpan,
|
||||
};
|
||||
|
||||
/// Initialize logging and OpenTelemetry tracing and exporter.
|
||||
@@ -33,6 +40,45 @@ pub async fn init() -> anyhow::Result<LoggingGuard> {
|
||||
Ok(LoggingGuard)
|
||||
}
|
||||
|
||||
/// Initialize logging for local_proxy with log prefix and no opentelemetry.
|
||||
///
|
||||
/// Logging can be configured using `RUST_LOG` environment variable.
|
||||
pub fn init_local_proxy() -> anyhow::Result<LoggingGuard> {
|
||||
let env_filter = EnvFilter::builder()
|
||||
.with_default_directive(LevelFilter::INFO.into())
|
||||
.from_env_lossy();
|
||||
|
||||
let fmt_layer = tracing_subscriber::fmt::layer()
|
||||
.with_ansi(false)
|
||||
.with_writer(std::io::stderr)
|
||||
.event_format(LocalProxyFormatter(Format::default().with_target(false)));
|
||||
|
||||
tracing_subscriber::registry()
|
||||
.with(env_filter)
|
||||
.with(fmt_layer)
|
||||
.try_init()?;
|
||||
|
||||
Ok(LoggingGuard)
|
||||
}
|
||||
|
||||
pub struct LocalProxyFormatter(Format<Full, SystemTime>);
|
||||
|
||||
impl<S, N> FormatEvent<S, N> for LocalProxyFormatter
|
||||
where
|
||||
S: Subscriber + for<'a> LookupSpan<'a>,
|
||||
N: for<'a> FormatFields<'a> + 'static,
|
||||
{
|
||||
fn format_event(
|
||||
&self,
|
||||
ctx: &tracing_subscriber::fmt::FmtContext<'_, S, N>,
|
||||
mut writer: tracing_subscriber::fmt::format::Writer<'_>,
|
||||
event: &tracing::Event<'_>,
|
||||
) -> std::fmt::Result {
|
||||
writer.write_str("[local_proxy] ")?;
|
||||
self.0.format_event(ctx, writer, event)
|
||||
}
|
||||
}
|
||||
|
||||
pub struct LoggingGuard;
|
||||
|
||||
impl Drop for LoggingGuard {
|
||||
|
||||
Reference in New Issue
Block a user